• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::ops::Deref;
16 #[cfg(feature = "metrics")]
17 use std::sync::atomic::AtomicUsize;
18 use std::sync::{Arc, Mutex};
19 
20 use ylong_io::{Interest, Source, Token};
21 
22 use crate::net::{Ready, ScheduleIO, Tick};
23 use crate::util::bit::{Bit, Mask};
24 use crate::util::slab::{Address, Ref, Slab};
25 
26 cfg_ffrt! {
27     use libc::{c_void, c_int, c_uint, c_uchar};
28 }
29 
30 cfg_not_ffrt! {
31     use ylong_io::{Events, Poll};
32     use std::time::Duration;
33 
34     const EVENTS_MAX_CAPACITY: usize = 1024;
35     const WAKE_TOKEN: Token = Token(1 << 31);
36 }
37 
38 const DRIVER_TICK_INIT: u8 = 0;
39 
40 // Token structure
41 // | reserved | generation | address |
42 // |----------|------------|---------|
43 // |   1 bit  |   7 bits   | 24 bits |
44 // const RESERVED: Mask = Mask::new(1, 31);
45 const GENERATION: Mask = Mask::new(7, 24);
46 const ADDRESS: Mask = Mask::new(24, 0);
47 
48 /// IO reactor that listens to fd events and wakes corresponding tasks.
49 pub(crate) struct IoDriver {
50     /// Stores every IO source that is ready
51     resources: Option<Slab<ScheduleIO>>,
52 
53     /// Counter used for slab struct to compact
54     tick: u8,
55 
56     /// Used for epoll
57     #[cfg(not(feature = "ffrt"))]
58     poll: Arc<Poll>,
59 
60     /// Stores IO events that need to be handled
61     #[cfg(not(feature = "ffrt"))]
62     events: Option<Events>,
63 
64     /// Save Handle used in metrics.
65     #[cfg(feature = "metrics")]
66     io_handle_inner: Arc<Inner>,
67 }
68 
69 pub(crate) struct IoHandle {
70     inner: Arc<Inner>,
71     #[cfg(not(feature = "ffrt"))]
72     pub(crate) waker: ylong_io::Waker,
73 }
74 
75 cfg_ffrt!(
76     use std::mem::MaybeUninit;
77     static mut DRIVER: MaybeUninit<IoDriver> = MaybeUninit::uninit();
78     static mut HANDLE: MaybeUninit<IoHandle> = MaybeUninit::uninit();
79 );
80 
81 #[cfg(feature = "ffrt")]
82 impl IoHandle {
new(inner: Arc<Inner>) -> Self83     fn new(inner: Arc<Inner>) -> Self {
84         IoHandle { inner }
85     }
86 
get_ref() -> &'static Self87     pub(crate) fn get_ref() -> &'static Self {
88         IoDriver::initialize();
89         unsafe { &*HANDLE.as_ptr() }
90     }
91 }
92 
93 #[cfg(not(feature = "ffrt"))]
94 impl IoHandle {
new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self95     fn new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self {
96         IoHandle { inner, waker }
97     }
98 
99     #[cfg(feature = "metrics")]
get_register_count(&self) -> usize100     pub(crate) fn get_register_count(&self) -> usize {
101         self.inner
102             .metrics
103             .register_count
104             .load(std::sync::atomic::Ordering::Acquire)
105     }
106 
107     #[cfg(feature = "metrics")]
get_ready_count(&self) -> usize108     pub(crate) fn get_ready_count(&self) -> usize {
109         self.inner
110             .metrics
111             .ready_count
112             .load(std::sync::atomic::Ordering::Acquire)
113     }
114 }
115 
116 impl Deref for IoHandle {
117     type Target = Arc<Inner>;
118 
deref(&self) -> &Self::Target119     fn deref(&self) -> &Self::Target {
120         &self.inner
121     }
122 }
123 
124 /// In charge of two functionalities
125 ///
126 /// 1)IO registration
127 /// 2)Resource management
128 pub(crate) struct Inner {
129     /// When the driver gets dropped, the resources in the driver will be
130     /// transmitted to here. Then all the slabs inside will get dropped when
131     /// Inner's ref count clears to zero, so there is no concurrent problem
132     /// when new slabs gets inserted
133     resources: Mutex<Option<Slab<ScheduleIO>>>,
134 
135     /// Used to register scheduleIO into the slab
136     allocator: Slab<ScheduleIO>,
137 
138     /// Used to register fd
139     #[cfg(not(feature = "ffrt"))]
140     registry: Arc<Poll>,
141 
142     /// Metrics
143     #[cfg(feature = "metrics")]
144     metrics: InnerMetrics,
145 }
146 
147 /// Metrics of Inner
148 #[cfg(feature = "metrics")]
149 struct InnerMetrics {
150     /// Register count. This value will only increment, not decrease.
151     register_count: AtomicUsize,
152 
153     /// Ready events count. This value will only increment, not decrease.
154     ready_count: AtomicUsize,
155 }
156 
157 impl IoDriver {
158     /// IO dispatch function. Wakes the task through the token getting from the
159     /// epoll events.
dispatch(&mut self, token: Token, ready: Ready)160     fn dispatch(&mut self, token: Token, ready: Ready) {
161         let addr_bit = Bit::from_usize(token.0);
162         let addr = addr_bit.get_by_mask(ADDRESS);
163 
164         let io = match self
165             .resources
166             .as_mut()
167             .unwrap()
168             .get(Address::from_usize(addr))
169         {
170             Some(io) => io,
171             None => return,
172         };
173 
174         if io
175             .set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready)
176             .is_err()
177         {
178             return;
179         }
180 
181         // Wake the io task
182         io.wake(ready)
183     }
184 }
185 
186 #[cfg(not(feature = "ffrt"))]
187 impl IoDriver {
initialize() -> (IoHandle, IoDriver)188     pub(crate) fn initialize() -> (IoHandle, IoDriver) {
189         let poll = Poll::new().unwrap();
190         let waker =
191             ylong_io::Waker::new(&poll, WAKE_TOKEN).expect("ylong_io waker construction failed");
192         let arc_poll = Arc::new(poll);
193         let events = Events::with_capacity(EVENTS_MAX_CAPACITY);
194         let slab = Slab::new();
195         let allocator = slab.handle();
196         let inner = Arc::new(Inner {
197             resources: Mutex::new(None),
198             allocator,
199             registry: arc_poll.clone(),
200             #[cfg(feature = "metrics")]
201             metrics: InnerMetrics {
202                 register_count: AtomicUsize::new(0),
203                 ready_count: AtomicUsize::new(0),
204             },
205         });
206 
207         let driver = IoDriver {
208             resources: Some(slab),
209             events: Some(events),
210             tick: DRIVER_TICK_INIT,
211             poll: arc_poll,
212             #[cfg(feature = "metrics")]
213             io_handle_inner: inner.clone(),
214         };
215 
216         (IoHandle::new(inner, waker), driver)
217     }
218 
219     /// Runs the driver. This method will blocking wait for fd events to come in
220     /// and then wakes the corresponding tasks through the events.
221     ///
222     /// In linux environment, the driver uses epoll.
drive(&mut self, time_out: Option<Duration>) -> io::Result<bool>223     pub(crate) fn drive(&mut self, time_out: Option<Duration>) -> io::Result<bool> {
224         use ylong_io::EventTrait;
225 
226         // For every 255 ticks, cleans the redundant entries inside the slab
227         const COMPACT_INTERVAL: u8 = 255;
228 
229         self.tick = self.tick.wrapping_add(1);
230 
231         if self.tick == COMPACT_INTERVAL {
232             unsafe {
233                 self.resources.as_mut().unwrap().compact();
234             }
235         }
236 
237         let mut events = match self.events.take() {
238             Some(ev) => ev,
239             None => {
240                 let err = io::Error::new(io::ErrorKind::Other, "driver event store missing.");
241                 return Err(err);
242             }
243         };
244 
245         match self.poll.poll(&mut events, time_out) {
246             Ok(_) => {}
247             Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
248             Err(err) => return Err(err),
249         }
250 
251         let has_events = !events.is_empty();
252 
253         for event in events.iter() {
254             let token = event.token();
255             if token == WAKE_TOKEN {
256                 continue;
257             }
258             let ready = Ready::from_event(event);
259             self.dispatch(token, ready);
260         }
261         #[cfg(feature = "metrics")]
262         self.io_handle_inner
263             .metrics
264             .ready_count
265             .fetch_add(events.len(), std::sync::atomic::Ordering::AcqRel);
266 
267         self.events = Some(events);
268         Ok(has_events)
269     }
270 }
271 
272 #[cfg(feature = "ffrt")]
273 impl IoDriver {
initialize()274     fn initialize() {
275         static ONCE: std::sync::Once = std::sync::Once::new();
276         ONCE.call_once(|| unsafe {
277             let slab = Slab::new();
278             let allocator = slab.handle();
279             let inner = Arc::new(Inner {
280                 resources: Mutex::new(None),
281                 allocator,
282             });
283 
284             let driver = IoDriver {
285                 resources: Some(slab),
286                 tick: DRIVER_TICK_INIT,
287             };
288             HANDLE = MaybeUninit::new(IoHandle::new(inner));
289             DRIVER = MaybeUninit::new(driver);
290         });
291     }
292 
293     /// Initializes the single instance IO driver.
get_mut_ref() -> &'static mut IoDriver294     pub(crate) fn get_mut_ref() -> &'static mut IoDriver {
295         IoDriver::initialize();
296         unsafe { &mut *DRIVER.as_mut_ptr() }
297     }
298 }
299 
300 #[cfg(feature = "ffrt")]
ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar)301 extern "C" fn ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar) {
302     const COMPACT_INTERVAL: u8 = 255;
303 
304     let driver = IoDriver::get_mut_ref();
305 
306     if new_tick == COMPACT_INTERVAL && driver.tick != new_tick {
307         unsafe {
308             driver.resources.as_mut().unwrap().compact();
309         }
310     }
311     driver.tick = new_tick;
312 
313     let token = Token::from_usize(data as usize);
314     let ready = crate::net::ready::from_event_inner(ready as i32);
315     driver.dispatch(token, ready);
316 }
317 
318 impl Inner {
319     /// Registers the fd of the `Source` object
320     #[cfg(not(feature = "ffrt"))]
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>321     pub(crate) fn register_source(
322         &self,
323         io: &mut impl Source,
324         interest: Interest,
325     ) -> io::Result<Ref<ScheduleIO>> {
326         // Allocates space for the slab. If reaches maximum capacity, error will be
327         // returned
328         let (schedule_io, token) = self.allocate_schedule_io_pair()?;
329 
330         self.registry
331             .register(io, Token::from_usize(token), interest)?;
332         #[cfg(feature = "metrics")]
333         self.metrics
334             .register_count
335             .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
336         Ok(schedule_io)
337     }
338 
allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)>339     fn allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)> {
340         let (addr, schedule_io) = unsafe {
341             self.allocator.allocate().ok_or_else(|| {
342                 io::Error::new(
343                     io::ErrorKind::Other,
344                     "driver at max registered I/O resources.",
345                 )
346             })?
347         };
348         let mut base = Bit::from_usize(0);
349         base.set_by_mask(GENERATION, schedule_io.generation());
350         base.set_by_mask(ADDRESS, addr.as_usize());
351         Ok((schedule_io, base.as_usize()))
352     }
353 
354     /// Registers the fd of the `Source` object
355     #[cfg(feature = "ffrt")]
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>356     pub(crate) fn register_source(
357         &self,
358         io: &mut impl Source,
359         interest: Interest,
360     ) -> io::Result<Ref<ScheduleIO>> {
361         // Allocates space for the slab. If reaches maximum capacity, error will be
362         // returned
363         let (schedule_io, token) = self.allocate_schedule_io_pair()?;
364 
365         fn interests_to_io_event(interests: Interest) -> c_uint {
366             let mut io_event = libc::EPOLLET as u32;
367 
368             if interests.is_readable() {
369                 io_event |= libc::EPOLLIN as u32;
370                 io_event |= libc::EPOLLRDHUP as u32;
371             }
372 
373             if interests.is_writable() {
374                 io_event |= libc::EPOLLOUT as u32;
375             }
376 
377             io_event as c_uint
378         }
379 
380         let event = interests_to_io_event(interest);
381         unsafe {
382             ylong_ffrt::ffrt_poller_register(
383                 io.as_raw_fd() as c_int,
384                 event,
385                 token as *const c_void,
386                 ffrt_dispatch_event,
387             );
388         }
389 
390         Ok(schedule_io)
391     }
392 
393     /// Deregisters the fd of the `Source` object.
394     #[cfg(not(feature = "ffrt"))]
deregister_source(&self, io: &mut impl Source) -> io::Result<()>395     pub(crate) fn deregister_source(&self, io: &mut impl Source) -> io::Result<()> {
396         self.registry.deregister(io)
397     }
398 }
399 
400 impl Drop for Inner {
drop(&mut self)401     fn drop(&mut self) {
402         let resources = self.resources.lock().unwrap().take();
403 
404         if let Some(mut slab) = resources {
405             slab.for_each(|io| {
406                 io.shutdown();
407             });
408         }
409     }
410 }
411