1 //! Runs `!Send` futures on the current thread. 2 use crate::loom::sync::{Arc, Mutex}; 3 use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task}; 4 use crate::sync::AtomicWaker; 5 use crate::util::VecDequeCell; 6 7 use std::cell::Cell; 8 use std::collections::VecDeque; 9 use std::fmt; 10 use std::future::Future; 11 use std::marker::PhantomData; 12 use std::pin::Pin; 13 use std::task::Poll; 14 15 use pin_project_lite::pin_project; 16 17 cfg_rt! { 18 /// A set of tasks which are executed on the same thread. 19 /// 20 /// In some cases, it is necessary to run one or more futures that do not 21 /// implement [`Send`] and thus are unsafe to send between threads. In these 22 /// cases, a [local task set] may be used to schedule one or more `!Send` 23 /// futures to run together on the same thread. 24 /// 25 /// For example, the following code will not compile: 26 /// 27 /// ```rust,compile_fail 28 /// use std::rc::Rc; 29 /// 30 /// #[tokio::main] 31 /// async fn main() { 32 /// // `Rc` does not implement `Send`, and thus may not be sent between 33 /// // threads safely. 34 /// let unsend_data = Rc::new("my unsend data..."); 35 /// 36 /// let unsend_data = unsend_data.clone(); 37 /// // Because the `async` block here moves `unsend_data`, the future is `!Send`. 38 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this 39 /// // will not compile. 40 /// tokio::spawn(async move { 41 /// println!("{}", unsend_data); 42 /// // ... 43 /// }).await.unwrap(); 44 /// } 45 /// ``` 46 /// 47 /// # Use with `run_until` 48 /// 49 /// To spawn `!Send` futures, we can use a local task set to schedule them 50 /// on the thread calling [`Runtime::block_on`]. When running inside of the 51 /// local task set, we can use [`task::spawn_local`], which can spawn 52 /// `!Send` futures. For example: 53 /// 54 /// ```rust 55 /// use std::rc::Rc; 56 /// use tokio::task; 57 /// 58 /// #[tokio::main] 59 /// async fn main() { 60 /// let unsend_data = Rc::new("my unsend data..."); 61 /// 62 /// // Construct a local task set that can run `!Send` futures. 63 /// let local = task::LocalSet::new(); 64 /// 65 /// // Run the local task set. 66 /// local.run_until(async move { 67 /// let unsend_data = unsend_data.clone(); 68 /// // `spawn_local` ensures that the future is spawned on the local 69 /// // task set. 70 /// task::spawn_local(async move { 71 /// println!("{}", unsend_data); 72 /// // ... 73 /// }).await.unwrap(); 74 /// }).await; 75 /// } 76 /// ``` 77 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`, 78 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It 79 /// cannot be used inside a task spawned with `tokio::spawn`. 80 /// 81 /// ## Awaiting a `LocalSet` 82 /// 83 /// Additionally, a `LocalSet` itself implements `Future`, completing when 84 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run 85 /// several futures on a `LocalSet` and drive the whole set until they 86 /// complete. For example, 87 /// 88 /// ```rust 89 /// use tokio::{task, time}; 90 /// use std::rc::Rc; 91 /// 92 /// #[tokio::main] 93 /// async fn main() { 94 /// let unsend_data = Rc::new("world"); 95 /// let local = task::LocalSet::new(); 96 /// 97 /// let unsend_data2 = unsend_data.clone(); 98 /// local.spawn_local(async move { 99 /// // ... 100 /// println!("hello {}", unsend_data2) 101 /// }); 102 /// 103 /// local.spawn_local(async move { 104 /// time::sleep(time::Duration::from_millis(100)).await; 105 /// println!("goodbye {}", unsend_data) 106 /// }); 107 /// 108 /// // ... 109 /// 110 /// local.await; 111 /// } 112 /// ``` 113 /// **Note:** Awaiting a `LocalSet` can only be done inside 114 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to 115 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with 116 /// `tokio::spawn`. 117 /// 118 /// ## Use inside `tokio::spawn` 119 /// 120 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so 121 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do 122 /// something else. The solution is to create the `LocalSet` somewhere else, 123 /// and communicate with it using an [`mpsc`] channel. 124 /// 125 /// The following example puts the `LocalSet` inside a new thread. 126 /// ``` 127 /// use tokio::runtime::Builder; 128 /// use tokio::sync::{mpsc, oneshot}; 129 /// use tokio::task::LocalSet; 130 /// 131 /// // This struct describes the task you want to spawn. Here we include 132 /// // some simple examples. The oneshot channel allows sending a response 133 /// // to the spawner. 134 /// #[derive(Debug)] 135 /// enum Task { 136 /// PrintNumber(u32), 137 /// AddOne(u32, oneshot::Sender<u32>), 138 /// } 139 /// 140 /// #[derive(Clone)] 141 /// struct LocalSpawner { 142 /// send: mpsc::UnboundedSender<Task>, 143 /// } 144 /// 145 /// impl LocalSpawner { 146 /// pub fn new() -> Self { 147 /// let (send, mut recv) = mpsc::unbounded_channel(); 148 /// 149 /// let rt = Builder::new_current_thread() 150 /// .enable_all() 151 /// .build() 152 /// .unwrap(); 153 /// 154 /// std::thread::spawn(move || { 155 /// let local = LocalSet::new(); 156 /// 157 /// local.spawn_local(async move { 158 /// while let Some(new_task) = recv.recv().await { 159 /// tokio::task::spawn_local(run_task(new_task)); 160 /// } 161 /// // If the while loop returns, then all the LocalSpawner 162 /// // objects have have been dropped. 163 /// }); 164 /// 165 /// // This will return once all senders are dropped and all 166 /// // spawned tasks have returned. 167 /// rt.block_on(local); 168 /// }); 169 /// 170 /// Self { 171 /// send, 172 /// } 173 /// } 174 /// 175 /// pub fn spawn(&self, task: Task) { 176 /// self.send.send(task).expect("Thread with LocalSet has shut down."); 177 /// } 178 /// } 179 /// 180 /// // This task may do !Send stuff. We use printing a number as an example, 181 /// // but it could be anything. 182 /// // 183 /// // The Task struct is an enum to support spawning many different kinds 184 /// // of operations. 185 /// async fn run_task(task: Task) { 186 /// match task { 187 /// Task::PrintNumber(n) => { 188 /// println!("{}", n); 189 /// }, 190 /// Task::AddOne(n, response) => { 191 /// // We ignore failures to send the response. 192 /// let _ = response.send(n + 1); 193 /// }, 194 /// } 195 /// } 196 /// 197 /// #[tokio::main] 198 /// async fn main() { 199 /// let spawner = LocalSpawner::new(); 200 /// 201 /// let (send, response) = oneshot::channel(); 202 /// spawner.spawn(Task::AddOne(10, send)); 203 /// let eleven = response.await.unwrap(); 204 /// assert_eq!(eleven, 11); 205 /// } 206 /// ``` 207 /// 208 /// [`Send`]: trait@std::marker::Send 209 /// [local task set]: struct@LocalSet 210 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on 211 /// [`task::spawn_local`]: fn@spawn_local 212 /// [`mpsc`]: mod@crate::sync::mpsc 213 pub struct LocalSet { 214 /// Current scheduler tick. 215 tick: Cell<u8>, 216 217 /// State available from thread-local. 218 context: Context, 219 220 /// This type should not be Send. 221 _not_send: PhantomData<*const ()>, 222 } 223 } 224 225 /// State available from the thread-local. 226 struct Context { 227 /// Collection of all active tasks spawned onto this executor. 228 owned: LocalOwnedTasks<Arc<Shared>>, 229 230 /// Local run queue sender and receiver. 231 queue: VecDequeCell<task::Notified<Arc<Shared>>>, 232 233 /// State shared between threads. 234 shared: Arc<Shared>, 235 } 236 237 /// LocalSet state shared between threads. 238 struct Shared { 239 /// Remote run queue sender. 240 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>, 241 242 /// Wake the `LocalSet` task. 243 waker: AtomicWaker, 244 } 245 246 pin_project! { 247 #[derive(Debug)] 248 struct RunUntil<'a, F> { 249 local_set: &'a LocalSet, 250 #[pin] 251 future: F, 252 } 253 } 254 255 scoped_thread_local!(static CURRENT: Context); 256 257 cfg_rt! { 258 /// Spawns a `!Send` future on the local task set. 259 /// 260 /// The spawned future will be run on the same thread that called `spawn_local.` 261 /// This may only be called from the context of a local task set. 262 /// 263 /// # Panics 264 /// 265 /// - This function panics if called outside of a local task set. 266 /// 267 /// # Examples 268 /// 269 /// ```rust 270 /// use std::rc::Rc; 271 /// use tokio::task; 272 /// 273 /// #[tokio::main] 274 /// async fn main() { 275 /// let unsend_data = Rc::new("my unsend data..."); 276 /// 277 /// let local = task::LocalSet::new(); 278 /// 279 /// // Run the local task set. 280 /// local.run_until(async move { 281 /// let unsend_data = unsend_data.clone(); 282 /// task::spawn_local(async move { 283 /// println!("{}", unsend_data); 284 /// // ... 285 /// }).await.unwrap(); 286 /// }).await; 287 /// } 288 /// ``` 289 #[cfg_attr(tokio_track_caller, track_caller)] 290 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output> 291 where 292 F: Future + 'static, 293 F::Output: 'static, 294 { 295 spawn_local_inner(future, None) 296 } 297 298 pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output> 299 where F: Future + 'static, 300 F::Output: 'static 301 { 302 let future = crate::util::trace::task(future, "local", name); 303 CURRENT.with(|maybe_cx| { 304 let cx = maybe_cx 305 .expect("`spawn_local` called from outside of a `task::LocalSet`"); 306 307 let (handle, notified) = cx.owned.bind(future, cx.shared.clone()); 308 309 if let Some(notified) = notified { 310 cx.shared.schedule(notified); 311 } 312 313 handle 314 }) 315 } 316 } 317 318 /// Initial queue capacity. 319 const INITIAL_CAPACITY: usize = 64; 320 321 /// Max number of tasks to poll per tick. 322 const MAX_TASKS_PER_TICK: usize = 61; 323 324 /// How often it check the remote queue first. 325 const REMOTE_FIRST_INTERVAL: u8 = 31; 326 327 impl LocalSet { 328 /// Returns a new local task set. new() -> LocalSet329 pub fn new() -> LocalSet { 330 LocalSet { 331 tick: Cell::new(0), 332 context: Context { 333 owned: LocalOwnedTasks::new(), 334 queue: VecDequeCell::with_capacity(INITIAL_CAPACITY), 335 shared: Arc::new(Shared { 336 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), 337 waker: AtomicWaker::new(), 338 }), 339 }, 340 _not_send: PhantomData, 341 } 342 } 343 344 /// Spawns a `!Send` task onto the local task set. 345 /// 346 /// This task is guaranteed to be run on the current thread. 347 /// 348 /// Unlike the free function [`spawn_local`], this method may be used to 349 /// spawn local tasks when the task set is _not_ running. For example: 350 /// ```rust 351 /// use tokio::task; 352 /// 353 /// #[tokio::main] 354 /// async fn main() { 355 /// let local = task::LocalSet::new(); 356 /// 357 /// // Spawn a future on the local set. This future will be run when 358 /// // we call `run_until` to drive the task set. 359 /// local.spawn_local(async { 360 /// // ... 361 /// }); 362 /// 363 /// // Run the local task set. 364 /// local.run_until(async move { 365 /// // ... 366 /// }).await; 367 /// 368 /// // When `run` finishes, we can spawn _more_ futures, which will 369 /// // run in subsequent calls to `run_until`. 370 /// local.spawn_local(async { 371 /// // ... 372 /// }); 373 /// 374 /// local.run_until(async move { 375 /// // ... 376 /// }).await; 377 /// } 378 /// ``` 379 /// [`spawn_local`]: fn@spawn_local 380 #[cfg_attr(tokio_track_caller, track_caller)] spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,381 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> 382 where 383 F: Future + 'static, 384 F::Output: 'static, 385 { 386 let future = crate::util::trace::task(future, "local", None); 387 388 let (handle, notified) = self.context.owned.bind(future, self.context.shared.clone()); 389 390 if let Some(notified) = notified { 391 self.context.shared.schedule(notified); 392 } 393 394 self.context.shared.waker.wake(); 395 handle 396 } 397 398 /// Runs a future to completion on the provided runtime, driving any local 399 /// futures spawned on this task set on the current thread. 400 /// 401 /// This runs the given future on the runtime, blocking until it is 402 /// complete, and yielding its resolved result. Any tasks or timers which 403 /// the future spawns internally will be executed on the runtime. The future 404 /// may also call [`spawn_local`] to spawn_local additional local futures on the 405 /// current thread. 406 /// 407 /// This method should not be called from an asynchronous context. 408 /// 409 /// # Panics 410 /// 411 /// This function panics if the executor is at capacity, if the provided 412 /// future panics, or if called within an asynchronous execution context. 413 /// 414 /// # Notes 415 /// 416 /// Since this function internally calls [`Runtime::block_on`], and drives 417 /// futures in the local task set inside that call to `block_on`, the local 418 /// futures may not use [in-place blocking]. If a blocking call needs to be 419 /// issued from a local task, the [`spawn_blocking`] API may be used instead. 420 /// 421 /// For example, this will panic: 422 /// ```should_panic 423 /// use tokio::runtime::Runtime; 424 /// use tokio::task; 425 /// 426 /// let rt = Runtime::new().unwrap(); 427 /// let local = task::LocalSet::new(); 428 /// local.block_on(&rt, async { 429 /// let join = task::spawn_local(async { 430 /// let blocking_result = task::block_in_place(|| { 431 /// // ... 432 /// }); 433 /// // ... 434 /// }); 435 /// join.await.unwrap(); 436 /// }) 437 /// ``` 438 /// This, however, will not panic: 439 /// ``` 440 /// use tokio::runtime::Runtime; 441 /// use tokio::task; 442 /// 443 /// let rt = Runtime::new().unwrap(); 444 /// let local = task::LocalSet::new(); 445 /// local.block_on(&rt, async { 446 /// let join = task::spawn_local(async { 447 /// let blocking_result = task::spawn_blocking(|| { 448 /// // ... 449 /// }).await; 450 /// // ... 451 /// }); 452 /// join.await.unwrap(); 453 /// }) 454 /// ``` 455 /// 456 /// [`spawn_local`]: fn@spawn_local 457 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on 458 /// [in-place blocking]: fn@crate::task::block_in_place 459 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking 460 #[cfg(feature = "rt")] 461 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future,462 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output 463 where 464 F: Future, 465 { 466 rt.block_on(self.run_until(future)) 467 } 468 469 /// Runs a future to completion on the local set, returning its output. 470 /// 471 /// This returns a future that runs the given future with a local set, 472 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures. 473 /// Any local futures spawned on the local set will be driven in the 474 /// background until the future passed to `run_until` completes. When the future 475 /// passed to `run` finishes, any local futures which have not completed 476 /// will remain on the local set, and will be driven on subsequent calls to 477 /// `run_until` or when [awaiting the local set] itself. 478 /// 479 /// # Examples 480 /// 481 /// ```rust 482 /// use tokio::task; 483 /// 484 /// #[tokio::main] 485 /// async fn main() { 486 /// task::LocalSet::new().run_until(async { 487 /// task::spawn_local(async move { 488 /// // ... 489 /// }).await.unwrap(); 490 /// // ... 491 /// }).await; 492 /// } 493 /// ``` 494 /// 495 /// [`spawn_local`]: fn@spawn_local 496 /// [awaiting the local set]: #awaiting-a-localset run_until<F>(&self, future: F) -> F::Output where F: Future,497 pub async fn run_until<F>(&self, future: F) -> F::Output 498 where 499 F: Future, 500 { 501 let run_until = RunUntil { 502 future, 503 local_set: self, 504 }; 505 run_until.await 506 } 507 508 /// Ticks the scheduler, returning whether the local future needs to be 509 /// notified again. tick(&self) -> bool510 fn tick(&self) -> bool { 511 for _ in 0..MAX_TASKS_PER_TICK { 512 match self.next_task() { 513 // Run the task 514 // 515 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be 516 // used. We are responsible for maintaining the invariant that 517 // `run_unchecked` is only called on threads that spawned the 518 // task initially. Because `LocalSet` itself is `!Send`, and 519 // `spawn_local` spawns into the `LocalSet` on the current 520 // thread, the invariant is maintained. 521 Some(task) => crate::coop::budget(|| task.run()), 522 // We have fully drained the queue of notified tasks, so the 523 // local future doesn't need to be notified again — it can wait 524 // until something else wakes a task in the local set. 525 None => return false, 526 } 527 } 528 529 true 530 } 531 next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>>532 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> { 533 let tick = self.tick.get(); 534 self.tick.set(tick.wrapping_add(1)); 535 536 let task = if tick % REMOTE_FIRST_INTERVAL == 0 { 537 self.context 538 .shared 539 .queue 540 .lock() 541 .as_mut() 542 .and_then(|queue| queue.pop_front()) 543 .or_else(|| self.context.queue.pop_front()) 544 } else { 545 self.context.queue.pop_front().or_else(|| { 546 self.context 547 .shared 548 .queue 549 .lock() 550 .as_mut() 551 .and_then(|queue| queue.pop_front()) 552 }) 553 }; 554 555 task.map(|task| self.context.owned.assert_owner(task)) 556 } 557 with<T>(&self, f: impl FnOnce() -> T) -> T558 fn with<T>(&self, f: impl FnOnce() -> T) -> T { 559 CURRENT.set(&self.context, f) 560 } 561 } 562 563 impl fmt::Debug for LocalSet { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result564 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 565 fmt.debug_struct("LocalSet").finish() 566 } 567 } 568 569 impl Future for LocalSet { 570 type Output = (); 571 poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>572 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { 573 // Register the waker before starting to work 574 self.context.shared.waker.register_by_ref(cx.waker()); 575 576 if self.with(|| self.tick()) { 577 // If `tick` returns true, we need to notify the local future again: 578 // there are still tasks remaining in the run queue. 579 cx.waker().wake_by_ref(); 580 Poll::Pending 581 } else if self.context.owned.is_empty() { 582 // If the scheduler has no remaining futures, we're done! 583 Poll::Ready(()) 584 } else { 585 // There are still futures in the local set, but we've polled all the 586 // futures in the run queue. Therefore, we can just return Pending 587 // since the remaining futures will be woken from somewhere else. 588 Poll::Pending 589 } 590 } 591 } 592 593 impl Default for LocalSet { default() -> LocalSet594 fn default() -> LocalSet { 595 LocalSet::new() 596 } 597 } 598 599 impl Drop for LocalSet { drop(&mut self)600 fn drop(&mut self) { 601 self.with(|| { 602 // Shut down all tasks in the LocalOwnedTasks and close it to 603 // prevent new tasks from ever being added. 604 self.context.owned.close_and_shutdown_all(); 605 606 // We already called shutdown on all tasks above, so there is no 607 // need to call shutdown. 608 for task in self.context.queue.take() { 609 drop(task); 610 } 611 612 // Take the queue from the Shared object to prevent pushing 613 // notifications to it in the future. 614 let queue = self.context.shared.queue.lock().take().unwrap(); 615 for task in queue { 616 drop(task); 617 } 618 619 assert!(self.context.owned.is_empty()); 620 }); 621 } 622 } 623 624 // === impl LocalFuture === 625 626 impl<T: Future> Future for RunUntil<'_, T> { 627 type Output = T::Output; 628 poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>629 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { 630 let me = self.project(); 631 632 me.local_set.with(|| { 633 me.local_set 634 .context 635 .shared 636 .waker 637 .register_by_ref(cx.waker()); 638 639 let _no_blocking = crate::runtime::enter::disallow_blocking(); 640 let f = me.future; 641 642 if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { 643 return Poll::Ready(output); 644 } 645 646 if me.local_set.tick() { 647 // If `tick` returns `true`, we need to notify the local future again: 648 // there are still tasks remaining in the run queue. 649 cx.waker().wake_by_ref(); 650 } 651 652 Poll::Pending 653 }) 654 } 655 } 656 657 impl Shared { 658 /// Schedule the provided task on the scheduler. schedule(&self, task: task::Notified<Arc<Self>>)659 fn schedule(&self, task: task::Notified<Arc<Self>>) { 660 CURRENT.with(|maybe_cx| match maybe_cx { 661 Some(cx) if cx.shared.ptr_eq(self) => { 662 cx.queue.push_back(task); 663 } 664 _ => { 665 // First check whether the queue is still there (if not, the 666 // LocalSet is dropped). Then push to it if so, and if not, 667 // do nothing. 668 let mut lock = self.queue.lock(); 669 670 if let Some(queue) = lock.as_mut() { 671 queue.push_back(task); 672 drop(lock); 673 self.waker.wake(); 674 } 675 } 676 }); 677 } 678 ptr_eq(&self, other: &Shared) -> bool679 fn ptr_eq(&self, other: &Shared) -> bool { 680 std::ptr::eq(self, other) 681 } 682 } 683 684 impl task::Schedule for Arc<Shared> { release(&self, task: &Task<Self>) -> Option<Task<Self>>685 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { 686 CURRENT.with(|maybe_cx| { 687 let cx = maybe_cx.expect("scheduler context missing"); 688 assert!(cx.shared.ptr_eq(self)); 689 cx.owned.remove(task) 690 }) 691 } 692 schedule(&self, task: task::Notified<Self>)693 fn schedule(&self, task: task::Notified<Self>) { 694 Shared::schedule(self, task); 695 } 696 } 697