1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::io;
15 use std::ops::Deref;
16 #[cfg(feature = "metrics")]
17 use std::sync::atomic::AtomicUsize;
18 use std::sync::{Arc, Mutex};
19
20 use ylong_io::{Interest, Source, Token};
21
22 use crate::net::{Ready, ScheduleIO, Tick};
23 use crate::util::bit::{Bit, Mask};
24 use crate::util::slab::{Address, Ref, Slab};
25
26 cfg_ffrt! {
27 use libc::{c_void, c_int, c_uint, c_uchar};
28 }
29
30 cfg_not_ffrt! {
31 use ylong_io::{Events, Poll};
32 use std::time::Duration;
33
34 const EVENTS_MAX_CAPACITY: usize = 1024;
35 const WAKE_TOKEN: Token = Token(1 << 31);
36 }
37
38 const DRIVER_TICK_INIT: u8 = 0;
39
40 // Token structure
41 // | reserved | generation | address |
42 // |----------|------------|---------|
43 // | 1 bit | 7 bits | 24 bits |
44 // const RESERVED: Mask = Mask::new(1, 31);
45 const GENERATION: Mask = Mask::new(7, 24);
46 const ADDRESS: Mask = Mask::new(24, 0);
47
48 /// IO reactor that listens to fd events and wakes corresponding tasks.
49 pub(crate) struct IoDriver {
50 /// Stores every IO source that is ready
51 resources: Option<Slab<ScheduleIO>>,
52
53 /// Counter used for slab struct to compact
54 tick: u8,
55
56 /// Used for epoll
57 #[cfg(not(feature = "ffrt"))]
58 poll: Arc<Poll>,
59
60 /// Stores IO events that need to be handled
61 #[cfg(not(feature = "ffrt"))]
62 events: Option<Events>,
63
64 /// Save Handle used in metrics.
65 #[cfg(feature = "metrics")]
66 io_handle_inner: Arc<Inner>,
67 }
68
69 pub(crate) struct IoHandle {
70 inner: Arc<Inner>,
71 #[cfg(not(feature = "ffrt"))]
72 pub(crate) waker: ylong_io::Waker,
73 }
74
75 cfg_ffrt!(
76 use std::mem::MaybeUninit;
77 static mut DRIVER: MaybeUninit<IoDriver> = MaybeUninit::uninit();
78 static mut HANDLE: MaybeUninit<IoHandle> = MaybeUninit::uninit();
79 );
80
81 #[cfg(feature = "ffrt")]
82 impl IoHandle {
new(inner: Arc<Inner>) -> Self83 fn new(inner: Arc<Inner>) -> Self {
84 IoHandle { inner }
85 }
86
get_ref() -> &'static Self87 pub(crate) fn get_ref() -> &'static Self {
88 IoDriver::initialize();
89 unsafe { &*HANDLE.as_ptr() }
90 }
91 }
92
93 #[cfg(not(feature = "ffrt"))]
94 impl IoHandle {
new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self95 fn new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self {
96 IoHandle { inner, waker }
97 }
98
99 #[cfg(feature = "metrics")]
get_register_count(&self) -> usize100 pub(crate) fn get_register_count(&self) -> usize {
101 self.inner
102 .metrics
103 .register_count
104 .load(std::sync::atomic::Ordering::Acquire)
105 }
106
107 #[cfg(feature = "metrics")]
get_ready_count(&self) -> usize108 pub(crate) fn get_ready_count(&self) -> usize {
109 self.inner
110 .metrics
111 .ready_count
112 .load(std::sync::atomic::Ordering::Acquire)
113 }
114 }
115
116 impl Deref for IoHandle {
117 type Target = Arc<Inner>;
118
deref(&self) -> &Self::Target119 fn deref(&self) -> &Self::Target {
120 &self.inner
121 }
122 }
123
124 /// In charge of two functionalities
125 ///
126 /// 1)IO registration
127 /// 2)Resource management
128 pub(crate) struct Inner {
129 /// When the driver gets dropped, the resources in the driver will be
130 /// transmitted to here. Then all the slabs inside will get dropped when
131 /// Inner's ref count clears to zero, so there is no concurrent problem
132 /// when new slabs gets inserted
133 resources: Mutex<Option<Slab<ScheduleIO>>>,
134
135 /// Used to register scheduleIO into the slab
136 allocator: Slab<ScheduleIO>,
137
138 /// Used to register fd
139 #[cfg(not(feature = "ffrt"))]
140 registry: Arc<Poll>,
141
142 /// Metrics
143 #[cfg(feature = "metrics")]
144 metrics: InnerMetrics,
145 }
146
147 /// Metrics of Inner
148 #[cfg(feature = "metrics")]
149 struct InnerMetrics {
150 /// Register count. This value will only increment, not decrease.
151 register_count: AtomicUsize,
152
153 /// Ready events count. This value will only increment, not decrease.
154 ready_count: AtomicUsize,
155 }
156
157 impl IoDriver {
158 /// IO dispatch function. Wakes the task through the token getting from the
159 /// epoll events.
dispatch(&mut self, token: Token, ready: Ready)160 fn dispatch(&mut self, token: Token, ready: Ready) {
161 let addr_bit = Bit::from_usize(token.0);
162 let addr = addr_bit.get_by_mask(ADDRESS);
163
164 let io = match self
165 .resources
166 .as_mut()
167 .unwrap()
168 .get(Address::from_usize(addr))
169 {
170 Some(io) => io,
171 None => return,
172 };
173
174 if io
175 .set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready)
176 .is_err()
177 {
178 return;
179 }
180
181 // Wake the io task
182 io.wake(ready)
183 }
184 }
185
186 #[cfg(not(feature = "ffrt"))]
187 impl IoDriver {
initialize() -> (IoHandle, IoDriver)188 pub(crate) fn initialize() -> (IoHandle, IoDriver) {
189 let poll = Poll::new().unwrap();
190 let waker =
191 ylong_io::Waker::new(&poll, WAKE_TOKEN).expect("ylong_io waker construction failed");
192 let arc_poll = Arc::new(poll);
193 let events = Events::with_capacity(EVENTS_MAX_CAPACITY);
194 let slab = Slab::new();
195 let allocator = slab.handle();
196 let inner = Arc::new(Inner {
197 resources: Mutex::new(None),
198 allocator,
199 registry: arc_poll.clone(),
200 #[cfg(feature = "metrics")]
201 metrics: InnerMetrics {
202 register_count: AtomicUsize::new(0),
203 ready_count: AtomicUsize::new(0),
204 },
205 });
206
207 let driver = IoDriver {
208 resources: Some(slab),
209 events: Some(events),
210 tick: DRIVER_TICK_INIT,
211 poll: arc_poll,
212 #[cfg(feature = "metrics")]
213 io_handle_inner: inner.clone(),
214 };
215
216 (IoHandle::new(inner, waker), driver)
217 }
218
219 /// Runs the driver. This method will blocking wait for fd events to come in
220 /// and then wakes the corresponding tasks through the events.
221 ///
222 /// In linux environment, the driver uses epoll.
drive(&mut self, time_out: Option<Duration>) -> io::Result<bool>223 pub(crate) fn drive(&mut self, time_out: Option<Duration>) -> io::Result<bool> {
224 use ylong_io::EventTrait;
225
226 // For every 255 ticks, cleans the redundant entries inside the slab
227 const COMPACT_INTERVAL: u8 = 255;
228
229 self.tick = self.tick.wrapping_add(1);
230
231 if self.tick == COMPACT_INTERVAL {
232 unsafe {
233 self.resources.as_mut().unwrap().compact();
234 }
235 }
236
237 let mut events = match self.events.take() {
238 Some(ev) => ev,
239 None => {
240 let err = io::Error::new(io::ErrorKind::Other, "driver event store missing.");
241 return Err(err);
242 }
243 };
244
245 match self.poll.poll(&mut events, time_out) {
246 Ok(_) => {}
247 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
248 Err(err) => return Err(err),
249 }
250
251 let has_events = !events.is_empty();
252
253 for event in events.iter() {
254 let token = event.token();
255 if token == WAKE_TOKEN {
256 continue;
257 }
258 let ready = Ready::from_event(event);
259 self.dispatch(token, ready);
260 }
261 #[cfg(feature = "metrics")]
262 self.io_handle_inner
263 .metrics
264 .ready_count
265 .fetch_add(events.len(), std::sync::atomic::Ordering::AcqRel);
266
267 self.events = Some(events);
268 Ok(has_events)
269 }
270 }
271
272 #[cfg(feature = "ffrt")]
273 impl IoDriver {
initialize()274 fn initialize() {
275 static ONCE: std::sync::Once = std::sync::Once::new();
276 ONCE.call_once(|| unsafe {
277 let slab = Slab::new();
278 let allocator = slab.handle();
279 let inner = Arc::new(Inner {
280 resources: Mutex::new(None),
281 allocator,
282 });
283
284 let driver = IoDriver {
285 resources: Some(slab),
286 tick: DRIVER_TICK_INIT,
287 };
288 HANDLE = MaybeUninit::new(IoHandle::new(inner));
289 DRIVER = MaybeUninit::new(driver);
290 });
291 }
292
293 /// Initializes the single instance IO driver.
get_mut_ref() -> &'static mut IoDriver294 pub(crate) fn get_mut_ref() -> &'static mut IoDriver {
295 IoDriver::initialize();
296 unsafe { &mut *DRIVER.as_mut_ptr() }
297 }
298 }
299
300 #[cfg(feature = "ffrt")]
ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar)301 extern "C" fn ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar) {
302 const COMPACT_INTERVAL: u8 = 255;
303
304 let driver = IoDriver::get_mut_ref();
305
306 if new_tick == COMPACT_INTERVAL && driver.tick != new_tick {
307 unsafe {
308 driver.resources.as_mut().unwrap().compact();
309 }
310 }
311 driver.tick = new_tick;
312
313 let token = Token::from_usize(data as usize);
314 let ready = crate::net::ready::from_event_inner(ready as i32);
315 driver.dispatch(token, ready);
316 }
317
318 impl Inner {
319 /// Registers the fd of the `Source` object
320 #[cfg(not(feature = "ffrt"))]
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>321 pub(crate) fn register_source(
322 &self,
323 io: &mut impl Source,
324 interest: Interest,
325 ) -> io::Result<Ref<ScheduleIO>> {
326 // Allocates space for the slab. If reaches maximum capacity, error will be
327 // returned
328 let (schedule_io, token) = self.allocate_schedule_io_pair()?;
329
330 self.registry
331 .register(io, Token::from_usize(token), interest)?;
332 #[cfg(feature = "metrics")]
333 self.metrics
334 .register_count
335 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
336 Ok(schedule_io)
337 }
338
allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)>339 fn allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)> {
340 let (addr, schedule_io) = unsafe {
341 self.allocator.allocate().ok_or_else(|| {
342 io::Error::new(
343 io::ErrorKind::Other,
344 "driver at max registered I/O resources.",
345 )
346 })?
347 };
348 let mut base = Bit::from_usize(0);
349 base.set_by_mask(GENERATION, schedule_io.generation());
350 base.set_by_mask(ADDRESS, addr.as_usize());
351 Ok((schedule_io, base.as_usize()))
352 }
353
354 /// Registers the fd of the `Source` object
355 #[cfg(feature = "ffrt")]
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>356 pub(crate) fn register_source(
357 &self,
358 io: &mut impl Source,
359 interest: Interest,
360 ) -> io::Result<Ref<ScheduleIO>> {
361 // Allocates space for the slab. If reaches maximum capacity, error will be
362 // returned
363 let (schedule_io, token) = self.allocate_schedule_io_pair()?;
364
365 fn interests_to_io_event(interests: Interest) -> c_uint {
366 let mut io_event = libc::EPOLLET as u32;
367
368 if interests.is_readable() {
369 io_event |= libc::EPOLLIN as u32;
370 io_event |= libc::EPOLLRDHUP as u32;
371 }
372
373 if interests.is_writable() {
374 io_event |= libc::EPOLLOUT as u32;
375 }
376
377 io_event as c_uint
378 }
379
380 let event = interests_to_io_event(interest);
381 unsafe {
382 ylong_ffrt::ffrt_poller_register(
383 io.as_raw_fd() as c_int,
384 event,
385 token as *const c_void,
386 ffrt_dispatch_event,
387 );
388 }
389
390 Ok(schedule_io)
391 }
392
393 /// Deregisters the fd of the `Source` object.
394 #[cfg(not(feature = "ffrt"))]
deregister_source(&self, io: &mut impl Source) -> io::Result<()>395 pub(crate) fn deregister_source(&self, io: &mut impl Source) -> io::Result<()> {
396 self.registry.deregister(io)
397 }
398 }
399
400 impl Drop for Inner {
drop(&mut self)401 fn drop(&mut self) {
402 let resources = self.resources.lock().unwrap().take();
403
404 if let Some(mut slab) = resources {
405 slab.for_each(|io| {
406 io.shutdown();
407 });
408 }
409 }
410 }
411