• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::UnsafeCell;
15 use std::future::Future;
16 use std::io;
17 use std::marker::PhantomPinned;
18 use std::pin::Pin;
19 use std::ptr::{addr_of_mut, NonNull};
20 use std::sync::atomic::AtomicUsize;
21 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
22 use std::sync::Mutex;
23 use std::task::{Context, Poll, Waker};
24 
25 use ylong_io::Interest;
26 
27 use crate::futures::poll_fn;
28 use crate::net::{Ready, ReadyEvent};
29 use crate::util::bit::{Bit, Mask};
30 use crate::util::linked_list::{Link, LinkedList, Node};
31 use crate::util::slab::Entry;
32 
33 const GENERATION: Mask = Mask::new(7, 24);
34 pub(crate) const DRIVER_TICK: Mask = Mask::new(8, 16);
35 pub(crate) const READINESS: Mask = Mask::new(16, 0);
36 
37 // ScheduleIO::status structure
38 //
39 // | reserved | generation | driver tick | readiness |
40 // |----------|------------|-------------|-----------|
41 // |  1 bit   |   7 bits   |   8 bits    |  16 bits  |
42 pub(crate) struct ScheduleIO {
43     /// ScheduleIO status
44     pub(crate) status: AtomicUsize,
45 
46     /// Wakers that wait for this IO
47     waiters: Mutex<Waiters>,
48 }
49 
50 #[derive(Default)]
51 pub(crate) struct Waiters {
52     list: LinkedList<Waiter>,
53 
54     // Reader & writer wakers are for AsyncRead/AsyncWriter
55     reader: Option<Waker>,
56 
57     writer: Option<Waker>,
58 
59     is_shutdown: bool,
60 }
61 
62 pub(crate) struct Waiter {
63     waker: Option<Waker>,
64 
65     interest: Interest,
66 
67     is_ready: bool,
68 
69     node: Node<Waiter>,
70 
71     _p: PhantomPinned,
72 }
73 
74 pub(crate) enum Tick {
75     Set(u8),
76     Clear(u8),
77 }
78 
79 impl Default for ScheduleIO {
default() -> Self80     fn default() -> Self {
81         ScheduleIO {
82             status: AtomicUsize::new(0),
83             waiters: Mutex::new(Default::default()),
84         }
85     }
86 }
87 
88 impl Default for Waiter {
default() -> Self89     fn default() -> Self {
90         Waiter {
91             waker: None,
92             interest: Interest::READABLE,
93             is_ready: false,
94             node: Node::new(),
95             _p: PhantomPinned,
96         }
97     }
98 }
99 
100 unsafe impl Link for Waiter {
node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,101     unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>>
102     where
103         Self: Sized,
104     {
105         let node_ptr = addr_of_mut!(ptr.as_mut().node);
106         NonNull::new_unchecked(node_ptr)
107     }
108 }
109 
110 impl Entry for ScheduleIO {
reset(&self)111     fn reset(&self) {
112         let status_bit = Bit::from_usize(self.status.load(Acquire));
113 
114         let generation = status_bit.get_by_mask(GENERATION);
115         let new_generation = generation.wrapping_add(1);
116         let mut next = Bit::from_usize(0);
117         next.set_by_mask(GENERATION, new_generation);
118         self.status.store(next.as_usize(), Release);
119     }
120 }
121 
122 impl ScheduleIO {
generation(&self) -> usize123     pub fn generation(&self) -> usize {
124         let base = Bit::from_usize(self.status.load(Acquire));
125         base.get_by_mask(GENERATION)
126     }
127 
128     #[cfg(feature = "net")]
poll_readiness( &self, cx: &mut Context<'_>, interest: Interest, ) -> Poll<ReadyEvent>129     pub(crate) fn poll_readiness(
130         &self,
131         cx: &mut Context<'_>,
132         interest: Interest,
133     ) -> Poll<ReadyEvent> {
134         // Get current status and check if it contains our interest
135         let curr_bit = Bit::from_usize(self.status.load(Acquire));
136         let ready = Ready::from_usize(curr_bit.get_by_mask(READINESS)).intersection(interest);
137 
138         if ready.is_empty() {
139             let mut waiters = self.waiters.lock().unwrap();
140             // Put the waker associated with the context into the waiters
141             match interest {
142                 Interest::WRITABLE => waiters.writer = Some(cx.waker().clone()),
143                 Interest::READABLE => waiters.reader = Some(cx.waker().clone()),
144                 _ => unreachable!(),
145             }
146 
147             // Check one more time to see if any event is ready
148             let ready_event = self.get_readiness(interest);
149             if !waiters.is_shutdown && ready_event.ready.is_empty() {
150                 Poll::Pending
151             } else {
152                 Poll::Ready(ready_event)
153             }
154         } else {
155             let tick = curr_bit.get_by_mask(DRIVER_TICK) as u8;
156             Poll::Ready(ReadyEvent::new(tick, ready))
157         }
158     }
159 
160     #[inline]
get_readiness(&self, interest: Interest) -> ReadyEvent161     pub(crate) fn get_readiness(&self, interest: Interest) -> ReadyEvent {
162         let status_bit = Bit::from_usize(self.status.load(Acquire));
163         let ready = Ready::from_usize(status_bit.get_by_mask(READINESS)).intersection(interest);
164         let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
165         ReadyEvent::new(tick, ready)
166     }
167 
readiness(&self, interest: Interest) -> io::Result<ReadyEvent>168     pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
169         let mut fut = self.readiness_fut(interest);
170         let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
171 
172         poll_fn(|cx| Pin::new(&mut fut).poll(cx)).await
173     }
174 
readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent>175     async fn readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent> {
176         Readiness::new(self, interest).await
177     }
178 
shutdown(&self)179     pub(crate) fn shutdown(&self) {
180         self.wake0(Ready::ALL, true);
181     }
182 
clear_readiness(&self, ready: ReadyEvent)183     pub(crate) fn clear_readiness(&self, ready: ReadyEvent) {
184         let mask_no_closed = ready.get_ready() - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
185         let _ = self.set_readiness(None, Tick::Clear(ready.get_tick()), |curr| {
186             curr - mask_no_closed
187         });
188     }
189 
set_readiness( &self, token: Option<usize>, tick: Tick, f: impl Fn(Ready) -> Ready, ) -> io::Result<()>190     pub(crate) fn set_readiness(
191         &self,
192         token: Option<usize>,
193         tick: Tick,
194         f: impl Fn(Ready) -> Ready,
195     ) -> io::Result<()> {
196         let mut current = self.status.load(Acquire);
197         loop {
198             let current_bit = Bit::from_usize(current);
199             let current_generation = current_bit.get_by_mask(GENERATION);
200 
201             // if token's generation is different from ScheduleIO's generation,
202             // this token is already expired.
203             if let Some(token) = token {
204                 if Bit::from_usize(token).get_by_mask(GENERATION) != current_generation {
205                     return Err(io::Error::new(
206                         io::ErrorKind::Other,
207                         "Token no longer valid.",
208                     ));
209                 }
210             }
211 
212             let current_readiness = Ready::from_usize(current_bit.get_by_mask(READINESS));
213             let new_readiness = f(current_readiness);
214             let mut new_bit = Bit::from_usize(new_readiness.as_usize());
215 
216             match tick {
217                 Tick::Set(t) => new_bit.set_by_mask(DRIVER_TICK, t as usize),
218                 // Check the tick to see if the event has already been covered.
219                 // If yes, clear the event.
220                 Tick::Clear(t) => {
221                     if current_bit.get_by_mask(DRIVER_TICK) as u8 != t {
222                         return Err(io::Error::new(
223                             io::ErrorKind::Other,
224                             "Readiness has been covered.",
225                         ));
226                     }
227                     new_bit.set_by_mask(DRIVER_TICK, t as usize);
228                 }
229             }
230 
231             new_bit.set_by_mask(GENERATION, current_generation);
232             match self
233                 .status
234                 .compare_exchange(current, new_bit.as_usize(), AcqRel, Acquire)
235             {
236                 Ok(_) => return Ok(()),
237                 // status has been changed already, so we repeats the loop
238                 Err(actual) => current = actual,
239             }
240         }
241     }
242 
wake(&self, ready: Ready)243     pub(crate) fn wake(&self, ready: Ready) {
244         self.wake0(ready, false);
245     }
246 
wake0(&self, ready: Ready, shutdown: bool)247     fn wake0(&self, ready: Ready, shutdown: bool) {
248         let mut wakers = Vec::new();
249         let mut waiters = self.waiters.lock().unwrap();
250         waiters.is_shutdown |= shutdown;
251 
252         if ready.is_readable() {
253             if let Some(waker) = waiters.reader.take() {
254                 wakers.push(Some(waker));
255             }
256         }
257 
258         if ready.is_writable() {
259             if let Some(waker) = waiters.writer.take() {
260                 wakers.push(Some(waker));
261             }
262         }
263 
264         waiters.list.drain_filtered(|waiter| {
265             if ready.satisfies(waiter.interest) {
266                 if let Some(waker) = waiter.waker.take() {
267                     waiter.is_ready = true;
268                     wakers.push(Some(waker));
269                 }
270                 return true;
271             }
272             false
273         });
274 
275         drop(waiters);
276         for waker in wakers.iter_mut() {
277             waker.take().unwrap().wake();
278         }
279     }
280 }
281 
282 impl Drop for ScheduleIO {
drop(&mut self)283     fn drop(&mut self) {
284         self.wake(Ready::ALL);
285     }
286 }
287 
288 unsafe impl Send for ScheduleIO {}
289 unsafe impl Sync for ScheduleIO {}
290 
291 pub(crate) struct Readiness<'a> {
292     schedule_io: &'a ScheduleIO,
293 
294     state: State,
295 
296     waiter: UnsafeCell<Waiter>,
297 }
298 
299 enum State {
300     Init,
301     Waiting,
302     Done,
303 }
304 
305 impl Readiness<'_> {
new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_>306     pub(crate) fn new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_> {
307         Readiness {
308             schedule_io,
309             state: State::Init,
310             waiter: UnsafeCell::new(Waiter {
311                 waker: None,
312                 interest,
313                 is_ready: false,
314                 node: Node::new(),
315                 _p: PhantomPinned,
316             }),
317         }
318     }
319 }
320 
321 impl Future for Readiness<'_> {
322     type Output = io::Result<ReadyEvent>;
323 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>324     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
325         let (schedule_io, state, waiter) = unsafe {
326             let me = self.get_unchecked_mut();
327             (me.schedule_io, &mut me.state, &me.waiter)
328         };
329         // Safety: `waiter.interest` never changes after initialization.
330         let interest = unsafe { (*waiter.get()).interest };
331         loop {
332             match *state {
333                 State::Init => {
334                     let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst));
335                     let readiness = Ready::from_usize(status_bit.get_by_mask(READINESS));
336                     let ready = readiness.intersection(interest);
337 
338                     // if events are ready, change status to done
339                     if !ready.is_empty() {
340                         let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
341                         *state = State::Done;
342                         return Poll::Ready(Ok(ReadyEvent::new(tick, ready)));
343                     }
344 
345                     let mut waiters = schedule_io.waiters.lock().unwrap();
346 
347                     let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst));
348                     let mut readiness = Ready::from_usize(status_bit.get_by_mask(READINESS));
349 
350                     if waiters.is_shutdown {
351                         readiness = Ready::ALL;
352                     }
353 
354                     let ready = readiness.intersection(interest);
355 
356                     // check one more time to see if events are ready
357                     if !ready.is_empty() {
358                         let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
359                         *state = State::Done;
360                         return Poll::Ready(Ok(ReadyEvent::new(tick, ready)));
361                     }
362 
363                     unsafe {
364                         (*waiter.get()).waker = Some(cx.waker().clone());
365 
366                         waiters
367                             .list
368                             .push_front(NonNull::new_unchecked(waiter.get()));
369                     }
370 
371                     *state = State::Waiting;
372                 }
373                 State::Waiting => {
374                     // waiters could also be accessed in other places, so get the lock
375                     let waiters = schedule_io.waiters.lock().unwrap();
376 
377                     let waiter = unsafe { &mut *waiter.get() };
378                     if waiter.is_ready {
379                         *state = State::Done;
380                     } else {
381                         if !waiter.waker.as_ref().unwrap().will_wake(cx.waker()) {
382                             waiter.waker = Some(cx.waker().clone());
383                         }
384                         return Poll::Pending;
385                     }
386                     drop(waiters);
387                 }
388                 State::Done => {
389                     let status_bit = Bit::from_usize(schedule_io.status.load(Acquire));
390                     return Poll::Ready(Ok(ReadyEvent::new(
391                         status_bit.get_by_mask(DRIVER_TICK) as u8,
392                         Ready::from_interest(interest),
393                     )));
394                 }
395             }
396         }
397     }
398 }
399 
400 unsafe impl Sync for Readiness<'_> {}
401 unsafe impl Send for Readiness<'_> {}
402 
403 impl Drop for Readiness<'_> {
drop(&mut self)404     fn drop(&mut self) {
405         let mut waiters = self.schedule_io.waiters.lock().unwrap();
406         // Safety: There is only one queue holding the node, and this is the only way
407         // for the node to dequeue.
408         unsafe {
409             waiters
410                 .list
411                 .remove(NonNull::new_unchecked(self.waiter.get()));
412         }
413     }
414 }
415 
416 #[cfg(test)]
417 mod schedule_io_test {
418     use std::io;
419     use std::sync::atomic::Ordering::{Acquire, Release};
420 
421     use crate::net::{Ready, ReadyEvent, ScheduleIO, Tick};
422     use crate::util::slab::Entry;
423 
424     /// UT test cases for schedule_io defalut
425     ///
426     /// # Brief
427     /// 1. Call default
428     /// 2. Verify the returned results
429     #[test]
ut_schedule_io_default()430     fn ut_schedule_io_default() {
431         let mut schedule_io = ScheduleIO::default();
432         let status = schedule_io.status.load(Acquire);
433         assert_eq!(status, 0);
434         let is_shutdown = schedule_io.waiters.get_mut().unwrap().is_shutdown;
435         assert!(!is_shutdown);
436     }
437 
438     /// UT test cases for schedule_io reset
439     ///
440     /// # Brief
441     /// 1. Create a ScheduleIO
442     /// 2. Call reset
443     /// 3. Verify the returned results
444     #[test]
ut_schedule_io_reset()445     fn ut_schedule_io_reset() {
446         let schedule_io = ScheduleIO::default();
447         let pre_status = schedule_io.status.load(Acquire);
448         assert_eq!(pre_status, 0x00);
449         schedule_io.reset();
450         let after_status = schedule_io.status.load(Acquire);
451         assert_eq!(after_status, 0x1000000);
452     }
453 
454     /// UT test cases for schedule_io generation
455     ///
456     /// # Brief
457     /// 1. Create a ScheduleIO
458     /// 2. Call generation
459     /// 3. Verify the returned results
460     #[test]
ut_schedule_io_generation()461     fn ut_schedule_io_generation() {
462         let schedule_io = ScheduleIO::default();
463         schedule_io.status.store(0x7f000000, Release);
464         assert_eq!(schedule_io.generation(), 0x7f);
465     }
466 
467     /// UT test cases for schedule_io shutdown
468     ///
469     /// # Brief
470     /// 1. Create a ScheduleIO
471     /// 2. Call shutdown
472     /// 3. Verify the returned results
473     #[test]
ut_schedule_io_shutdown()474     fn ut_schedule_io_shutdown() {
475         let mut schedule_io = ScheduleIO::default();
476         schedule_io.shutdown();
477         assert!(schedule_io.waiters.get_mut().unwrap().is_shutdown);
478     }
479 
480     /// UT test cases for schedule_io clear_readiness
481     ///
482     /// # Brief
483     /// 1. Create a ScheduleIO
484     /// 2. Call clear_readiness
485     /// 3. Verify the returned results
486     #[test]
ut_schedule_io_clear_readiness()487     fn ut_schedule_io_clear_readiness() {
488         let schedule_io = ScheduleIO::default();
489         schedule_io.status.store(0x0000000f, Release);
490         schedule_io.clear_readiness(ReadyEvent::new(0, Ready::from_usize(0x1)));
491         let status = schedule_io.status.load(Acquire);
492         assert_eq!(status, 0x0000000e);
493     }
494 
495     /// UT test cases for schedule_io set_readiness
496     ///
497     /// # Brief
498     /// 1. Create a ScheduleIO
499     /// 2. Call set_readiness
500     /// 3. Verify the returned results
501     #[test]
ut_schedule_io_set_readiness()502     fn ut_schedule_io_set_readiness() {
503         ut_schedule_io_set_readiness_01();
504         ut_schedule_io_set_readiness_02();
505         ut_schedule_io_set_readiness_03();
506 
507         fn ut_schedule_io_set_readiness_01() {
508             let schedule_io = ScheduleIO::default();
509             let token = 0x7f000000usize;
510             let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr);
511             let err = ret.err().unwrap();
512             assert_eq!(err.kind(), io::ErrorKind::Other);
513             assert_eq!(
514                 format! {"{}", err.into_inner().unwrap()},
515                 "Token no longer valid."
516             );
517         }
518 
519         fn ut_schedule_io_set_readiness_02() {
520             let schedule_io = ScheduleIO::default();
521             let token = 0x00000000usize;
522             let ret = schedule_io.set_readiness(Some(token), Tick::Clear(1), |curr| curr);
523             let err = ret.err().unwrap();
524             assert_eq!(err.kind(), io::ErrorKind::Other);
525             assert_eq!(
526                 format! {"{}", err.into_inner().unwrap()},
527                 "Readiness has been covered."
528             );
529         }
530 
531         fn ut_schedule_io_set_readiness_03() {
532             let schedule_io = ScheduleIO::default();
533             let token = 0x00000000usize;
534             let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr);
535             assert!(ret.is_ok());
536             let status = schedule_io.status.load(Acquire);
537             assert_eq!(status, 0x00010000);
538         }
539     }
540 }
541