• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::io::interest::Interest;
2 use crate::io::ready::Ready;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Mutex;
5 use crate::runtime::io::{Direction, ReadyEvent, Tick};
6 use crate::util::bit;
7 use crate::util::linked_list::{self, LinkedList};
8 use crate::util::WakeList;
9 
10 use std::cell::UnsafeCell;
11 use std::future::Future;
12 use std::marker::PhantomPinned;
13 use std::pin::Pin;
14 use std::ptr::NonNull;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire};
16 use std::task::{Context, Poll, Waker};
17 
18 /// Stored in the I/O driver resource slab.
19 #[derive(Debug)]
20 // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
21 // from crossbeam-utils/src/cache_padded.rs
22 //
23 // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
24 // lines at a time, so we have to align to 128 bytes rather than 64.
25 //
26 // Sources:
27 // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
28 // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
29 //
30 // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
31 //
32 // Sources:
33 // - https://www.mono-project.com/news/2016/09/12/arm64-icache/
34 //
35 // powerpc64 has 128-byte cache line size.
36 //
37 // Sources:
38 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
39 #[cfg_attr(
40     any(
41         target_arch = "x86_64",
42         target_arch = "aarch64",
43         target_arch = "powerpc64",
44     ),
45     repr(align(128))
46 )]
47 // arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size.
48 //
49 // Sources:
50 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
51 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
52 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
53 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
54 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
55 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
56 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
57 //
58 // riscv32 is assumed not to exceed the cache line size of riscv64.
59 #[cfg_attr(
60     any(
61         target_arch = "arm",
62         target_arch = "mips",
63         target_arch = "mips64",
64         target_arch = "riscv32",
65         target_arch = "riscv64",
66         target_arch = "sparc",
67         target_arch = "hexagon",
68     ),
69     repr(align(32))
70 )]
71 // m68k has 16-byte cache line size.
72 //
73 // Sources:
74 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
75 #[cfg_attr(target_arch = "m68k", repr(align(16)))]
76 // s390x has 256-byte cache line size.
77 //
78 // Sources:
79 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
80 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
81 #[cfg_attr(target_arch = "s390x", repr(align(256)))]
82 // x86, wasm, and sparc64 have 64-byte cache line size.
83 //
84 // Sources:
85 // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
86 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
87 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
88 //
89 // All others are assumed to have 64-byte cache line size.
90 #[cfg_attr(
91     not(any(
92         target_arch = "x86_64",
93         target_arch = "aarch64",
94         target_arch = "powerpc64",
95         target_arch = "arm",
96         target_arch = "mips",
97         target_arch = "mips64",
98         target_arch = "riscv32",
99         target_arch = "riscv64",
100         target_arch = "sparc",
101         target_arch = "hexagon",
102         target_arch = "m68k",
103         target_arch = "s390x",
104     )),
105     repr(align(64))
106 )]
107 pub(crate) struct ScheduledIo {
108     pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
109 
110     /// Packs the resource's readiness and I/O driver latest tick.
111     readiness: AtomicUsize,
112 
113     waiters: Mutex<Waiters>,
114 }
115 
116 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
117 
118 #[derive(Debug, Default)]
119 struct Waiters {
120     /// List of all current waiters.
121     list: WaitList,
122 
123     /// Waker used for AsyncRead.
124     reader: Option<Waker>,
125 
126     /// Waker used for AsyncWrite.
127     writer: Option<Waker>,
128 }
129 
130 #[derive(Debug)]
131 struct Waiter {
132     pointers: linked_list::Pointers<Waiter>,
133 
134     /// The waker for this task.
135     waker: Option<Waker>,
136 
137     /// The interest this waiter is waiting on.
138     interest: Interest,
139 
140     is_ready: bool,
141 
142     /// Should never be `!Unpin`.
143     _p: PhantomPinned,
144 }
145 
146 generate_addr_of_methods! {
147     impl<> Waiter {
148         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
149             &self.pointers
150         }
151     }
152 }
153 
154 /// Future returned by `readiness()`.
155 struct Readiness<'a> {
156     scheduled_io: &'a ScheduledIo,
157 
158     state: State,
159 
160     /// Entry in the waiter `LinkedList`.
161     waiter: UnsafeCell<Waiter>,
162 }
163 
164 enum State {
165     Init,
166     Waiting,
167     Done,
168 }
169 
170 // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
171 //
172 // | shutdown | driver tick | readiness |
173 // |----------+-------------+-----------|
174 // |   1 bit  |   8 bits    +   16 bits |
175 
176 const READINESS: bit::Pack = bit::Pack::least_significant(16);
177 
178 const TICK: bit::Pack = READINESS.then(8);
179 
180 const SHUTDOWN: bit::Pack = TICK.then(1);
181 
182 // ===== impl ScheduledIo =====
183 
184 impl Default for ScheduledIo {
default() -> ScheduledIo185     fn default() -> ScheduledIo {
186         ScheduledIo {
187             linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()),
188             readiness: AtomicUsize::new(0),
189             waiters: Mutex::new(Default::default()),
190         }
191     }
192 }
193 
194 impl ScheduledIo {
token(&self) -> mio::Token195     pub(crate) fn token(&self) -> mio::Token {
196         // use `expose_addr` when stable
197         mio::Token(self as *const _ as usize)
198     }
199 
200     /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
201     /// permanently shutdown state.
shutdown(&self)202     pub(super) fn shutdown(&self) {
203         let mask = SHUTDOWN.pack(1, 0);
204         self.readiness.fetch_or(mask, AcqRel);
205         self.wake(Ready::ALL);
206     }
207 
208     /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
209     /// the current value, returning the previous readiness value.
210     ///
211     /// # Arguments
212     /// - `tick`: whether setting the tick or trying to clear readiness for a
213     ///    specific tick.
214     /// - `f`: a closure returning a new readiness value given the previous
215     ///   readiness.
set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready)216     pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) {
217         let mut current = self.readiness.load(Acquire);
218 
219         // The shutdown bit should not be set
220         debug_assert_eq!(0, SHUTDOWN.unpack(current));
221 
222         loop {
223             // Mask out the tick bits so that the modifying function doesn't see
224             // them.
225             let current_readiness = Ready::from_usize(current);
226             let new = f(current_readiness);
227 
228             let next = match tick {
229                 Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
230                 Tick::Clear(t) => {
231                     if TICK.unpack(current) as u8 != t {
232                         // Trying to clear readiness with an old event!
233                         return;
234                     }
235 
236                     TICK.pack(t as usize, new.as_usize())
237                 }
238             };
239 
240             match self
241                 .readiness
242                 .compare_exchange(current, next, AcqRel, Acquire)
243             {
244                 Ok(_) => return,
245                 // we lost the race, retry!
246                 Err(actual) => current = actual,
247             }
248         }
249     }
250 
251     /// Notifies all pending waiters that have registered interest in `ready`.
252     ///
253     /// There may be many waiters to notify. Waking the pending task **must** be
254     /// done from outside of the lock otherwise there is a potential for a
255     /// deadlock.
256     ///
257     /// A stack array of wakers is created and filled with wakers to notify, the
258     /// lock is released, and the wakers are notified. Because there may be more
259     /// than 32 wakers to notify, if the stack array fills up, the lock is
260     /// released, the array is cleared, and the iteration continues.
wake(&self, ready: Ready)261     pub(super) fn wake(&self, ready: Ready) {
262         let mut wakers = WakeList::new();
263 
264         let mut waiters = self.waiters.lock();
265 
266         // check for AsyncRead slot
267         if ready.is_readable() {
268             if let Some(waker) = waiters.reader.take() {
269                 wakers.push(waker);
270             }
271         }
272 
273         // check for AsyncWrite slot
274         if ready.is_writable() {
275             if let Some(waker) = waiters.writer.take() {
276                 wakers.push(waker);
277             }
278         }
279 
280         'outer: loop {
281             let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
282 
283             while wakers.can_push() {
284                 match iter.next() {
285                     Some(waiter) => {
286                         let waiter = unsafe { &mut *waiter.as_ptr() };
287 
288                         if let Some(waker) = waiter.waker.take() {
289                             waiter.is_ready = true;
290                             wakers.push(waker);
291                         }
292                     }
293                     None => {
294                         break 'outer;
295                     }
296                 }
297             }
298 
299             drop(waiters);
300 
301             wakers.wake_all();
302 
303             // Acquire the lock again.
304             waiters = self.waiters.lock();
305         }
306 
307         // Release the lock before notifying
308         drop(waiters);
309 
310         wakers.wake_all();
311     }
312 
ready_event(&self, interest: Interest) -> ReadyEvent313     pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
314         let curr = self.readiness.load(Acquire);
315 
316         ReadyEvent {
317             tick: TICK.unpack(curr) as u8,
318             ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
319             is_shutdown: SHUTDOWN.unpack(curr) != 0,
320         }
321     }
322 
323     /// Polls for readiness events in a given direction.
324     ///
325     /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
326     /// which cannot use the `async fn` version. This uses reserved reader
327     /// and writer slots.
poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<ReadyEvent>328     pub(super) fn poll_readiness(
329         &self,
330         cx: &mut Context<'_>,
331         direction: Direction,
332     ) -> Poll<ReadyEvent> {
333         let curr = self.readiness.load(Acquire);
334 
335         let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
336         let is_shutdown = SHUTDOWN.unpack(curr) != 0;
337 
338         if ready.is_empty() && !is_shutdown {
339             // Update the task info
340             let mut waiters = self.waiters.lock();
341             let slot = match direction {
342                 Direction::Read => &mut waiters.reader,
343                 Direction::Write => &mut waiters.writer,
344             };
345 
346             // Avoid cloning the waker if one is already stored that matches the
347             // current task.
348             match slot {
349                 Some(existing) => {
350                     if !existing.will_wake(cx.waker()) {
351                         *existing = cx.waker().clone();
352                     }
353                 }
354                 None => {
355                     *slot = Some(cx.waker().clone());
356                 }
357             }
358 
359             // Try again, in case the readiness was changed while we were
360             // taking the waiters lock
361             let curr = self.readiness.load(Acquire);
362             let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
363             let is_shutdown = SHUTDOWN.unpack(curr) != 0;
364             if is_shutdown {
365                 Poll::Ready(ReadyEvent {
366                     tick: TICK.unpack(curr) as u8,
367                     ready: direction.mask(),
368                     is_shutdown,
369                 })
370             } else if ready.is_empty() {
371                 Poll::Pending
372             } else {
373                 Poll::Ready(ReadyEvent {
374                     tick: TICK.unpack(curr) as u8,
375                     ready,
376                     is_shutdown,
377                 })
378             }
379         } else {
380             Poll::Ready(ReadyEvent {
381                 tick: TICK.unpack(curr) as u8,
382                 ready,
383                 is_shutdown,
384             })
385         }
386     }
387 
clear_readiness(&self, event: ReadyEvent)388     pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
389         // This consumes the current readiness state **except** for closed
390         // states. Closed states are excluded because they are final states.
391         let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
392         self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed);
393     }
394 
clear_wakers(&self)395     pub(crate) fn clear_wakers(&self) {
396         let mut waiters = self.waiters.lock();
397         waiters.reader.take();
398         waiters.writer.take();
399     }
400 }
401 
402 impl Drop for ScheduledIo {
drop(&mut self)403     fn drop(&mut self) {
404         self.wake(Ready::ALL);
405     }
406 }
407 
408 unsafe impl Send for ScheduledIo {}
409 unsafe impl Sync for ScheduledIo {}
410 
411 impl ScheduledIo {
412     /// An async version of `poll_readiness` which uses a linked list of wakers.
readiness(&self, interest: Interest) -> ReadyEvent413     pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
414         self.readiness_fut(interest).await
415     }
416 
417     // This is in a separate function so that the borrow checker doesn't think
418     // we are borrowing the `UnsafeCell` possibly over await boundaries.
419     //
420     // Go figure.
readiness_fut(&self, interest: Interest) -> Readiness<'_>421     fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
422         Readiness {
423             scheduled_io: self,
424             state: State::Init,
425             waiter: UnsafeCell::new(Waiter {
426                 pointers: linked_list::Pointers::new(),
427                 waker: None,
428                 is_ready: false,
429                 interest,
430                 _p: PhantomPinned,
431             }),
432         }
433     }
434 }
435 
436 unsafe impl linked_list::Link for Waiter {
437     type Handle = NonNull<Waiter>;
438     type Target = Waiter;
439 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>440     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
441         *handle
442     }
443 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>444     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
445         ptr
446     }
447 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>448     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
449         Waiter::addr_of_pointers(target)
450     }
451 }
452 
453 // ===== impl Readiness =====
454 
455 impl Future for Readiness<'_> {
456     type Output = ReadyEvent;
457 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>458     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
459         use std::sync::atomic::Ordering::SeqCst;
460 
461         let (scheduled_io, state, waiter) = unsafe {
462             let me = self.get_unchecked_mut();
463             (&me.scheduled_io, &mut me.state, &me.waiter)
464         };
465 
466         loop {
467             match *state {
468                 State::Init => {
469                     // Optimistically check existing readiness
470                     let curr = scheduled_io.readiness.load(SeqCst);
471                     let ready = Ready::from_usize(READINESS.unpack(curr));
472                     let is_shutdown = SHUTDOWN.unpack(curr) != 0;
473 
474                     // Safety: `waiter.interest` never changes
475                     let interest = unsafe { (*waiter.get()).interest };
476                     let ready = ready.intersection(interest);
477 
478                     if !ready.is_empty() || is_shutdown {
479                         // Currently ready!
480                         let tick = TICK.unpack(curr) as u8;
481                         *state = State::Done;
482                         return Poll::Ready(ReadyEvent {
483                             tick,
484                             ready,
485                             is_shutdown,
486                         });
487                     }
488 
489                     // Wasn't ready, take the lock (and check again while locked).
490                     let mut waiters = scheduled_io.waiters.lock();
491 
492                     let curr = scheduled_io.readiness.load(SeqCst);
493                     let mut ready = Ready::from_usize(READINESS.unpack(curr));
494                     let is_shutdown = SHUTDOWN.unpack(curr) != 0;
495 
496                     if is_shutdown {
497                         ready = Ready::ALL;
498                     }
499 
500                     let ready = ready.intersection(interest);
501 
502                     if !ready.is_empty() || is_shutdown {
503                         // Currently ready!
504                         let tick = TICK.unpack(curr) as u8;
505                         *state = State::Done;
506                         return Poll::Ready(ReadyEvent {
507                             tick,
508                             ready,
509                             is_shutdown,
510                         });
511                     }
512 
513                     // Not ready even after locked, insert into list...
514 
515                     // Safety: called while locked
516                     unsafe {
517                         (*waiter.get()).waker = Some(cx.waker().clone());
518                     }
519 
520                     // Insert the waiter into the linked list
521                     //
522                     // safety: pointers from `UnsafeCell` are never null.
523                     waiters
524                         .list
525                         .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
526                     *state = State::Waiting;
527                 }
528                 State::Waiting => {
529                     // Currently in the "Waiting" state, implying the caller has
530                     // a waiter stored in the waiter list (guarded by
531                     // `notify.waiters`). In order to access the waker fields,
532                     // we must hold the lock.
533 
534                     let waiters = scheduled_io.waiters.lock();
535 
536                     // Safety: called while locked
537                     let w = unsafe { &mut *waiter.get() };
538 
539                     if w.is_ready {
540                         // Our waker has been notified.
541                         *state = State::Done;
542                     } else {
543                         // Update the waker, if necessary.
544                         if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
545                             w.waker = Some(cx.waker().clone());
546                         }
547 
548                         return Poll::Pending;
549                     }
550 
551                     // Explicit drop of the lock to indicate the scope that the
552                     // lock is held. Because holding the lock is required to
553                     // ensure safe access to fields not held within the lock, it
554                     // is helpful to visualize the scope of the critical
555                     // section.
556                     drop(waiters);
557                 }
558                 State::Done => {
559                     // Safety: State::Done means it is no longer shared
560                     let w = unsafe { &mut *waiter.get() };
561 
562                     let curr = scheduled_io.readiness.load(Acquire);
563                     let is_shutdown = SHUTDOWN.unpack(curr) != 0;
564 
565                     // The returned tick might be newer than the event
566                     // which notified our waker. This is ok because the future
567                     // still didn't return `Poll::Ready`.
568                     let tick = TICK.unpack(curr) as u8;
569 
570                     // The readiness state could have been cleared in the meantime,
571                     // but we allow the returned ready set to be empty.
572                     let curr_ready = Ready::from_usize(READINESS.unpack(curr));
573                     let ready = curr_ready.intersection(w.interest);
574 
575                     return Poll::Ready(ReadyEvent {
576                         tick,
577                         ready,
578                         is_shutdown,
579                     });
580                 }
581             }
582         }
583     }
584 }
585 
586 impl Drop for Readiness<'_> {
drop(&mut self)587     fn drop(&mut self) {
588         let mut waiters = self.scheduled_io.waiters.lock();
589 
590         // Safety: `waiter` is only ever stored in `waiters`
591         unsafe {
592             waiters
593                 .list
594                 .remove(NonNull::new_unchecked(self.waiter.get()))
595         };
596     }
597 }
598 
599 unsafe impl Send for Readiness<'_> {}
600 unsafe impl Sync for Readiness<'_> {}
601