1 use crate::io::interest::Interest; 2 use crate::io::ready::Ready; 3 use crate::loom::sync::atomic::AtomicUsize; 4 use crate::loom::sync::Mutex; 5 use crate::runtime::io::{Direction, ReadyEvent, Tick}; 6 use crate::util::bit; 7 use crate::util::linked_list::{self, LinkedList}; 8 use crate::util::WakeList; 9 10 use std::cell::UnsafeCell; 11 use std::future::Future; 12 use std::marker::PhantomPinned; 13 use std::pin::Pin; 14 use std::ptr::NonNull; 15 use std::sync::atomic::Ordering::{AcqRel, Acquire}; 16 use std::task::{Context, Poll, Waker}; 17 18 /// Stored in the I/O driver resource slab. 19 #[derive(Debug)] 20 // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied 21 // from crossbeam-utils/src/cache_padded.rs 22 // 23 // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache 24 // lines at a time, so we have to align to 128 bytes rather than 64. 25 // 26 // Sources: 27 // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf 28 // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 29 // 30 // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. 31 // 32 // Sources: 33 // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ 34 // 35 // powerpc64 has 128-byte cache line size. 36 // 37 // Sources: 38 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 39 #[cfg_attr( 40 any( 41 target_arch = "x86_64", 42 target_arch = "aarch64", 43 target_arch = "powerpc64", 44 ), 45 repr(align(128)) 46 )] 47 // arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size. 48 // 49 // Sources: 50 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 51 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 52 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 53 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 54 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 55 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 56 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 57 // 58 // riscv32 is assumed not to exceed the cache line size of riscv64. 59 #[cfg_attr( 60 any( 61 target_arch = "arm", 62 target_arch = "mips", 63 target_arch = "mips64", 64 target_arch = "riscv32", 65 target_arch = "riscv64", 66 target_arch = "sparc", 67 target_arch = "hexagon", 68 ), 69 repr(align(32)) 70 )] 71 // m68k has 16-byte cache line size. 72 // 73 // Sources: 74 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 75 #[cfg_attr(target_arch = "m68k", repr(align(16)))] 76 // s390x has 256-byte cache line size. 77 // 78 // Sources: 79 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 80 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 81 #[cfg_attr(target_arch = "s390x", repr(align(256)))] 82 // x86, wasm, and sparc64 have 64-byte cache line size. 83 // 84 // Sources: 85 // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 86 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 87 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 88 // 89 // All others are assumed to have 64-byte cache line size. 90 #[cfg_attr( 91 not(any( 92 target_arch = "x86_64", 93 target_arch = "aarch64", 94 target_arch = "powerpc64", 95 target_arch = "arm", 96 target_arch = "mips", 97 target_arch = "mips64", 98 target_arch = "riscv32", 99 target_arch = "riscv64", 100 target_arch = "sparc", 101 target_arch = "hexagon", 102 target_arch = "m68k", 103 target_arch = "s390x", 104 )), 105 repr(align(64)) 106 )] 107 pub(crate) struct ScheduledIo { 108 pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>, 109 110 /// Packs the resource's readiness and I/O driver latest tick. 111 readiness: AtomicUsize, 112 113 waiters: Mutex<Waiters>, 114 } 115 116 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; 117 118 #[derive(Debug, Default)] 119 struct Waiters { 120 /// List of all current waiters. 121 list: WaitList, 122 123 /// Waker used for AsyncRead. 124 reader: Option<Waker>, 125 126 /// Waker used for AsyncWrite. 127 writer: Option<Waker>, 128 } 129 130 #[derive(Debug)] 131 struct Waiter { 132 pointers: linked_list::Pointers<Waiter>, 133 134 /// The waker for this task. 135 waker: Option<Waker>, 136 137 /// The interest this waiter is waiting on. 138 interest: Interest, 139 140 is_ready: bool, 141 142 /// Should never be `!Unpin`. 143 _p: PhantomPinned, 144 } 145 146 generate_addr_of_methods! { 147 impl<> Waiter { 148 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { 149 &self.pointers 150 } 151 } 152 } 153 154 /// Future returned by `readiness()`. 155 struct Readiness<'a> { 156 scheduled_io: &'a ScheduledIo, 157 158 state: State, 159 160 /// Entry in the waiter `LinkedList`. 161 waiter: UnsafeCell<Waiter>, 162 } 163 164 enum State { 165 Init, 166 Waiting, 167 Done, 168 } 169 170 // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. 171 // 172 // | shutdown | driver tick | readiness | 173 // |----------+-------------+-----------| 174 // | 1 bit | 8 bits + 16 bits | 175 176 const READINESS: bit::Pack = bit::Pack::least_significant(16); 177 178 const TICK: bit::Pack = READINESS.then(8); 179 180 const SHUTDOWN: bit::Pack = TICK.then(1); 181 182 // ===== impl ScheduledIo ===== 183 184 impl Default for ScheduledIo { default() -> ScheduledIo185 fn default() -> ScheduledIo { 186 ScheduledIo { 187 linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()), 188 readiness: AtomicUsize::new(0), 189 waiters: Mutex::new(Default::default()), 190 } 191 } 192 } 193 194 impl ScheduledIo { token(&self) -> mio::Token195 pub(crate) fn token(&self) -> mio::Token { 196 // use `expose_addr` when stable 197 mio::Token(self as *const _ as usize) 198 } 199 200 /// Invoked when the IO driver is shut down; forces this ScheduledIo into a 201 /// permanently shutdown state. shutdown(&self)202 pub(super) fn shutdown(&self) { 203 let mask = SHUTDOWN.pack(1, 0); 204 self.readiness.fetch_or(mask, AcqRel); 205 self.wake(Ready::ALL); 206 } 207 208 /// Sets the readiness on this `ScheduledIo` by invoking the given closure on 209 /// the current value, returning the previous readiness value. 210 /// 211 /// # Arguments 212 /// - `tick`: whether setting the tick or trying to clear readiness for a 213 /// specific tick. 214 /// - `f`: a closure returning a new readiness value given the previous 215 /// readiness. set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready)216 pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) { 217 let mut current = self.readiness.load(Acquire); 218 219 // The shutdown bit should not be set 220 debug_assert_eq!(0, SHUTDOWN.unpack(current)); 221 222 loop { 223 // Mask out the tick bits so that the modifying function doesn't see 224 // them. 225 let current_readiness = Ready::from_usize(current); 226 let new = f(current_readiness); 227 228 let next = match tick { 229 Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), 230 Tick::Clear(t) => { 231 if TICK.unpack(current) as u8 != t { 232 // Trying to clear readiness with an old event! 233 return; 234 } 235 236 TICK.pack(t as usize, new.as_usize()) 237 } 238 }; 239 240 match self 241 .readiness 242 .compare_exchange(current, next, AcqRel, Acquire) 243 { 244 Ok(_) => return, 245 // we lost the race, retry! 246 Err(actual) => current = actual, 247 } 248 } 249 } 250 251 /// Notifies all pending waiters that have registered interest in `ready`. 252 /// 253 /// There may be many waiters to notify. Waking the pending task **must** be 254 /// done from outside of the lock otherwise there is a potential for a 255 /// deadlock. 256 /// 257 /// A stack array of wakers is created and filled with wakers to notify, the 258 /// lock is released, and the wakers are notified. Because there may be more 259 /// than 32 wakers to notify, if the stack array fills up, the lock is 260 /// released, the array is cleared, and the iteration continues. wake(&self, ready: Ready)261 pub(super) fn wake(&self, ready: Ready) { 262 let mut wakers = WakeList::new(); 263 264 let mut waiters = self.waiters.lock(); 265 266 // check for AsyncRead slot 267 if ready.is_readable() { 268 if let Some(waker) = waiters.reader.take() { 269 wakers.push(waker); 270 } 271 } 272 273 // check for AsyncWrite slot 274 if ready.is_writable() { 275 if let Some(waker) = waiters.writer.take() { 276 wakers.push(waker); 277 } 278 } 279 280 'outer: loop { 281 let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); 282 283 while wakers.can_push() { 284 match iter.next() { 285 Some(waiter) => { 286 let waiter = unsafe { &mut *waiter.as_ptr() }; 287 288 if let Some(waker) = waiter.waker.take() { 289 waiter.is_ready = true; 290 wakers.push(waker); 291 } 292 } 293 None => { 294 break 'outer; 295 } 296 } 297 } 298 299 drop(waiters); 300 301 wakers.wake_all(); 302 303 // Acquire the lock again. 304 waiters = self.waiters.lock(); 305 } 306 307 // Release the lock before notifying 308 drop(waiters); 309 310 wakers.wake_all(); 311 } 312 ready_event(&self, interest: Interest) -> ReadyEvent313 pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { 314 let curr = self.readiness.load(Acquire); 315 316 ReadyEvent { 317 tick: TICK.unpack(curr) as u8, 318 ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), 319 is_shutdown: SHUTDOWN.unpack(curr) != 0, 320 } 321 } 322 323 /// Polls for readiness events in a given direction. 324 /// 325 /// These are to support `AsyncRead` and `AsyncWrite` polling methods, 326 /// which cannot use the `async fn` version. This uses reserved reader 327 /// and writer slots. poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<ReadyEvent>328 pub(super) fn poll_readiness( 329 &self, 330 cx: &mut Context<'_>, 331 direction: Direction, 332 ) -> Poll<ReadyEvent> { 333 let curr = self.readiness.load(Acquire); 334 335 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); 336 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 337 338 if ready.is_empty() && !is_shutdown { 339 // Update the task info 340 let mut waiters = self.waiters.lock(); 341 let slot = match direction { 342 Direction::Read => &mut waiters.reader, 343 Direction::Write => &mut waiters.writer, 344 }; 345 346 // Avoid cloning the waker if one is already stored that matches the 347 // current task. 348 match slot { 349 Some(existing) => { 350 if !existing.will_wake(cx.waker()) { 351 *existing = cx.waker().clone(); 352 } 353 } 354 None => { 355 *slot = Some(cx.waker().clone()); 356 } 357 } 358 359 // Try again, in case the readiness was changed while we were 360 // taking the waiters lock 361 let curr = self.readiness.load(Acquire); 362 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); 363 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 364 if is_shutdown { 365 Poll::Ready(ReadyEvent { 366 tick: TICK.unpack(curr) as u8, 367 ready: direction.mask(), 368 is_shutdown, 369 }) 370 } else if ready.is_empty() { 371 Poll::Pending 372 } else { 373 Poll::Ready(ReadyEvent { 374 tick: TICK.unpack(curr) as u8, 375 ready, 376 is_shutdown, 377 }) 378 } 379 } else { 380 Poll::Ready(ReadyEvent { 381 tick: TICK.unpack(curr) as u8, 382 ready, 383 is_shutdown, 384 }) 385 } 386 } 387 clear_readiness(&self, event: ReadyEvent)388 pub(crate) fn clear_readiness(&self, event: ReadyEvent) { 389 // This consumes the current readiness state **except** for closed 390 // states. Closed states are excluded because they are final states. 391 let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; 392 self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed); 393 } 394 clear_wakers(&self)395 pub(crate) fn clear_wakers(&self) { 396 let mut waiters = self.waiters.lock(); 397 waiters.reader.take(); 398 waiters.writer.take(); 399 } 400 } 401 402 impl Drop for ScheduledIo { drop(&mut self)403 fn drop(&mut self) { 404 self.wake(Ready::ALL); 405 } 406 } 407 408 unsafe impl Send for ScheduledIo {} 409 unsafe impl Sync for ScheduledIo {} 410 411 impl ScheduledIo { 412 /// An async version of `poll_readiness` which uses a linked list of wakers. readiness(&self, interest: Interest) -> ReadyEvent413 pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { 414 self.readiness_fut(interest).await 415 } 416 417 // This is in a separate function so that the borrow checker doesn't think 418 // we are borrowing the `UnsafeCell` possibly over await boundaries. 419 // 420 // Go figure. readiness_fut(&self, interest: Interest) -> Readiness<'_>421 fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { 422 Readiness { 423 scheduled_io: self, 424 state: State::Init, 425 waiter: UnsafeCell::new(Waiter { 426 pointers: linked_list::Pointers::new(), 427 waker: None, 428 is_ready: false, 429 interest, 430 _p: PhantomPinned, 431 }), 432 } 433 } 434 } 435 436 unsafe impl linked_list::Link for Waiter { 437 type Handle = NonNull<Waiter>; 438 type Target = Waiter; 439 as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>440 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { 441 *handle 442 } 443 from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>444 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { 445 ptr 446 } 447 pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>448 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { 449 Waiter::addr_of_pointers(target) 450 } 451 } 452 453 // ===== impl Readiness ===== 454 455 impl Future for Readiness<'_> { 456 type Output = ReadyEvent; 457 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>458 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 459 use std::sync::atomic::Ordering::SeqCst; 460 461 let (scheduled_io, state, waiter) = unsafe { 462 let me = self.get_unchecked_mut(); 463 (&me.scheduled_io, &mut me.state, &me.waiter) 464 }; 465 466 loop { 467 match *state { 468 State::Init => { 469 // Optimistically check existing readiness 470 let curr = scheduled_io.readiness.load(SeqCst); 471 let ready = Ready::from_usize(READINESS.unpack(curr)); 472 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 473 474 // Safety: `waiter.interest` never changes 475 let interest = unsafe { (*waiter.get()).interest }; 476 let ready = ready.intersection(interest); 477 478 if !ready.is_empty() || is_shutdown { 479 // Currently ready! 480 let tick = TICK.unpack(curr) as u8; 481 *state = State::Done; 482 return Poll::Ready(ReadyEvent { 483 tick, 484 ready, 485 is_shutdown, 486 }); 487 } 488 489 // Wasn't ready, take the lock (and check again while locked). 490 let mut waiters = scheduled_io.waiters.lock(); 491 492 let curr = scheduled_io.readiness.load(SeqCst); 493 let mut ready = Ready::from_usize(READINESS.unpack(curr)); 494 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 495 496 if is_shutdown { 497 ready = Ready::ALL; 498 } 499 500 let ready = ready.intersection(interest); 501 502 if !ready.is_empty() || is_shutdown { 503 // Currently ready! 504 let tick = TICK.unpack(curr) as u8; 505 *state = State::Done; 506 return Poll::Ready(ReadyEvent { 507 tick, 508 ready, 509 is_shutdown, 510 }); 511 } 512 513 // Not ready even after locked, insert into list... 514 515 // Safety: called while locked 516 unsafe { 517 (*waiter.get()).waker = Some(cx.waker().clone()); 518 } 519 520 // Insert the waiter into the linked list 521 // 522 // safety: pointers from `UnsafeCell` are never null. 523 waiters 524 .list 525 .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); 526 *state = State::Waiting; 527 } 528 State::Waiting => { 529 // Currently in the "Waiting" state, implying the caller has 530 // a waiter stored in the waiter list (guarded by 531 // `notify.waiters`). In order to access the waker fields, 532 // we must hold the lock. 533 534 let waiters = scheduled_io.waiters.lock(); 535 536 // Safety: called while locked 537 let w = unsafe { &mut *waiter.get() }; 538 539 if w.is_ready { 540 // Our waker has been notified. 541 *state = State::Done; 542 } else { 543 // Update the waker, if necessary. 544 if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { 545 w.waker = Some(cx.waker().clone()); 546 } 547 548 return Poll::Pending; 549 } 550 551 // Explicit drop of the lock to indicate the scope that the 552 // lock is held. Because holding the lock is required to 553 // ensure safe access to fields not held within the lock, it 554 // is helpful to visualize the scope of the critical 555 // section. 556 drop(waiters); 557 } 558 State::Done => { 559 // Safety: State::Done means it is no longer shared 560 let w = unsafe { &mut *waiter.get() }; 561 562 let curr = scheduled_io.readiness.load(Acquire); 563 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 564 565 // The returned tick might be newer than the event 566 // which notified our waker. This is ok because the future 567 // still didn't return `Poll::Ready`. 568 let tick = TICK.unpack(curr) as u8; 569 570 // The readiness state could have been cleared in the meantime, 571 // but we allow the returned ready set to be empty. 572 let curr_ready = Ready::from_usize(READINESS.unpack(curr)); 573 let ready = curr_ready.intersection(w.interest); 574 575 return Poll::Ready(ReadyEvent { 576 tick, 577 ready, 578 is_shutdown, 579 }); 580 } 581 } 582 } 583 } 584 } 585 586 impl Drop for Readiness<'_> { drop(&mut self)587 fn drop(&mut self) { 588 let mut waiters = self.scheduled_io.waiters.lock(); 589 590 // Safety: `waiter` is only ever stored in `waiters` 591 unsafe { 592 waiters 593 .list 594 .remove(NonNull::new_unchecked(self.waiter.get())) 595 }; 596 } 597 } 598 599 unsafe impl Send for Readiness<'_> {} 600 unsafe impl Sync for Readiness<'_> {} 601