1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::cell::UnsafeCell; 15 use std::future::Future; 16 use std::io; 17 use std::marker::PhantomPinned; 18 use std::pin::Pin; 19 use std::ptr::{addr_of_mut, NonNull}; 20 use std::sync::atomic::AtomicUsize; 21 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst}; 22 use std::sync::Mutex; 23 use std::task::{Context, Poll, Waker}; 24 25 use ylong_io::Interest; 26 27 use crate::futures::poll_fn; 28 use crate::net::{Ready, ReadyEvent}; 29 use crate::util::bit::{Bit, Mask}; 30 use crate::util::linked_list::{Link, LinkedList, Node}; 31 use crate::util::slab::Entry; 32 33 const GENERATION: Mask = Mask::new(7, 24); 34 pub(crate) const DRIVER_TICK: Mask = Mask::new(8, 16); 35 pub(crate) const READINESS: Mask = Mask::new(16, 0); 36 37 // ScheduleIO::status structure 38 // 39 // | reserved | generation | driver tick | readiness | 40 // |----------|------------|-------------|-----------| 41 // | 1 bit | 7 bits | 8 bits | 16 bits | 42 pub(crate) struct ScheduleIO { 43 /// ScheduleIO status 44 pub(crate) status: AtomicUsize, 45 46 /// Wakers that wait for this IO 47 waiters: Mutex<Waiters>, 48 } 49 50 #[derive(Default)] 51 pub(crate) struct Waiters { 52 list: LinkedList<Waiter>, 53 54 // Reader & writer wakers are for AsyncRead/AsyncWriter 55 reader: Option<Waker>, 56 57 writer: Option<Waker>, 58 59 is_shutdown: bool, 60 } 61 62 pub(crate) struct Waiter { 63 waker: Option<Waker>, 64 65 interest: Interest, 66 67 is_ready: bool, 68 69 node: Node<Waiter>, 70 71 _p: PhantomPinned, 72 } 73 74 pub(crate) enum Tick { 75 Set(u8), 76 Clear(u8), 77 } 78 79 impl Default for ScheduleIO { default() -> Self80 fn default() -> Self { 81 ScheduleIO { 82 status: AtomicUsize::new(0), 83 waiters: Mutex::new(Default::default()), 84 } 85 } 86 } 87 88 impl Default for Waiter { default() -> Self89 fn default() -> Self { 90 Waiter { 91 waker: None, 92 interest: Interest::READABLE, 93 is_ready: false, 94 node: Node::new(), 95 _p: PhantomPinned, 96 } 97 } 98 } 99 100 unsafe impl Link for Waiter { node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,101 unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> 102 where 103 Self: Sized, 104 { 105 let node_ptr = addr_of_mut!(ptr.as_mut().node); 106 NonNull::new_unchecked(node_ptr) 107 } 108 } 109 110 impl Entry for ScheduleIO { reset(&self)111 fn reset(&self) { 112 let status_bit = Bit::from_usize(self.status.load(Acquire)); 113 114 let generation = status_bit.get_by_mask(GENERATION); 115 let new_generation = generation.wrapping_add(1); 116 let mut next = Bit::from_usize(0); 117 next.set_by_mask(GENERATION, new_generation); 118 self.status.store(next.as_usize(), Release); 119 } 120 } 121 122 impl ScheduleIO { generation(&self) -> usize123 pub fn generation(&self) -> usize { 124 let base = Bit::from_usize(self.status.load(Acquire)); 125 base.get_by_mask(GENERATION) 126 } 127 128 #[cfg(feature = "net")] poll_readiness( &self, cx: &mut Context<'_>, interest: Interest, ) -> Poll<ReadyEvent>129 pub(crate) fn poll_readiness( 130 &self, 131 cx: &mut Context<'_>, 132 interest: Interest, 133 ) -> Poll<ReadyEvent> { 134 // Get current status and check if it contains our interest 135 let curr_bit = Bit::from_usize(self.status.load(Acquire)); 136 let ready = Ready::from_usize(curr_bit.get_by_mask(READINESS)).intersection(interest); 137 138 if ready.is_empty() { 139 let mut waiters = self.waiters.lock().unwrap(); 140 // Put the waker associated with the context into the waiters 141 match interest { 142 Interest::WRITABLE => waiters.writer = Some(cx.waker().clone()), 143 Interest::READABLE => waiters.reader = Some(cx.waker().clone()), 144 _ => unreachable!(), 145 } 146 147 // Check one more time to see if any event is ready 148 let ready_event = self.get_readiness(interest); 149 if !waiters.is_shutdown && ready_event.ready.is_empty() { 150 Poll::Pending 151 } else { 152 Poll::Ready(ready_event) 153 } 154 } else { 155 let tick = curr_bit.get_by_mask(DRIVER_TICK) as u8; 156 Poll::Ready(ReadyEvent::new(tick, ready)) 157 } 158 } 159 160 #[inline] get_readiness(&self, interest: Interest) -> ReadyEvent161 pub(crate) fn get_readiness(&self, interest: Interest) -> ReadyEvent { 162 let status_bit = Bit::from_usize(self.status.load(Acquire)); 163 let ready = Ready::from_usize(status_bit.get_by_mask(READINESS)).intersection(interest); 164 let tick = status_bit.get_by_mask(DRIVER_TICK) as u8; 165 ReadyEvent::new(tick, ready) 166 } 167 readiness(&self, interest: Interest) -> io::Result<ReadyEvent>168 pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { 169 let mut fut = self.readiness_fut(interest); 170 let mut fut = unsafe { Pin::new_unchecked(&mut fut) }; 171 172 poll_fn(|cx| Pin::new(&mut fut).poll(cx)).await 173 } 174 readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent>175 async fn readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent> { 176 Readiness::new(self, interest).await 177 } 178 shutdown(&self)179 pub(crate) fn shutdown(&self) { 180 self.wake0(Ready::ALL, true); 181 } 182 clear_readiness(&self, ready: ReadyEvent)183 pub(crate) fn clear_readiness(&self, ready: ReadyEvent) { 184 let mask_no_closed = ready.get_ready() - Ready::READ_CLOSED - Ready::WRITE_CLOSED; 185 let _ = self.set_readiness(None, Tick::Clear(ready.get_tick()), |curr| { 186 curr - mask_no_closed 187 }); 188 } 189 set_readiness( &self, token: Option<usize>, tick: Tick, f: impl Fn(Ready) -> Ready, ) -> io::Result<()>190 pub(crate) fn set_readiness( 191 &self, 192 token: Option<usize>, 193 tick: Tick, 194 f: impl Fn(Ready) -> Ready, 195 ) -> io::Result<()> { 196 let mut current = self.status.load(Acquire); 197 loop { 198 let current_bit = Bit::from_usize(current); 199 let current_generation = current_bit.get_by_mask(GENERATION); 200 201 // if token's generation is different from ScheduleIO's generation, 202 // this token is already expired. 203 if let Some(token) = token { 204 if Bit::from_usize(token).get_by_mask(GENERATION) != current_generation { 205 return Err(io::Error::new( 206 io::ErrorKind::Other, 207 "Token no longer valid.", 208 )); 209 } 210 } 211 212 let current_readiness = Ready::from_usize(current_bit.get_by_mask(READINESS)); 213 let new_readiness = f(current_readiness); 214 let mut new_bit = Bit::from_usize(new_readiness.as_usize()); 215 216 match tick { 217 Tick::Set(t) => new_bit.set_by_mask(DRIVER_TICK, t as usize), 218 // Check the tick to see if the event has already been covered. 219 // If yes, clear the event. 220 Tick::Clear(t) => { 221 if current_bit.get_by_mask(DRIVER_TICK) as u8 != t { 222 return Err(io::Error::new( 223 io::ErrorKind::Other, 224 "Readiness has been covered.", 225 )); 226 } 227 new_bit.set_by_mask(DRIVER_TICK, t as usize); 228 } 229 } 230 231 new_bit.set_by_mask(GENERATION, current_generation); 232 match self 233 .status 234 .compare_exchange(current, new_bit.as_usize(), AcqRel, Acquire) 235 { 236 Ok(_) => return Ok(()), 237 // status has been changed already, so we repeats the loop 238 Err(actual) => current = actual, 239 } 240 } 241 } 242 wake(&self, ready: Ready)243 pub(crate) fn wake(&self, ready: Ready) { 244 self.wake0(ready, false); 245 } 246 wake0(&self, ready: Ready, shutdown: bool)247 fn wake0(&self, ready: Ready, shutdown: bool) { 248 let mut wakers = Vec::new(); 249 let mut waiters = self.waiters.lock().unwrap(); 250 waiters.is_shutdown |= shutdown; 251 252 if ready.is_readable() { 253 if let Some(waker) = waiters.reader.take() { 254 wakers.push(Some(waker)); 255 } 256 } 257 258 if ready.is_writable() { 259 if let Some(waker) = waiters.writer.take() { 260 wakers.push(Some(waker)); 261 } 262 } 263 264 waiters.list.drain_filtered(|waiter| { 265 if ready.satisfies(waiter.interest) { 266 if let Some(waker) = waiter.waker.take() { 267 waiter.is_ready = true; 268 wakers.push(Some(waker)); 269 } 270 return true; 271 } 272 false 273 }); 274 275 drop(waiters); 276 for waker in wakers.iter_mut() { 277 waker.take().unwrap().wake(); 278 } 279 } 280 } 281 282 impl Drop for ScheduleIO { drop(&mut self)283 fn drop(&mut self) { 284 self.wake(Ready::ALL); 285 } 286 } 287 288 unsafe impl Send for ScheduleIO {} 289 unsafe impl Sync for ScheduleIO {} 290 291 pub(crate) struct Readiness<'a> { 292 schedule_io: &'a ScheduleIO, 293 294 state: State, 295 296 waiter: UnsafeCell<Waiter>, 297 } 298 299 enum State { 300 Init, 301 Waiting, 302 Done, 303 } 304 305 impl Readiness<'_> { new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_>306 pub(crate) fn new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_> { 307 Readiness { 308 schedule_io, 309 state: State::Init, 310 waiter: UnsafeCell::new(Waiter { 311 waker: None, 312 interest, 313 is_ready: false, 314 node: Node::new(), 315 _p: PhantomPinned, 316 }), 317 } 318 } 319 } 320 321 impl Future for Readiness<'_> { 322 type Output = io::Result<ReadyEvent>; 323 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>324 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 325 let (schedule_io, state, waiter) = unsafe { 326 let me = self.get_unchecked_mut(); 327 (me.schedule_io, &mut me.state, &me.waiter) 328 }; 329 // Safety: `waiter.interest` never changes after initialization. 330 let interest = unsafe { (*waiter.get()).interest }; 331 loop { 332 match *state { 333 State::Init => { 334 let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst)); 335 let readiness = Ready::from_usize(status_bit.get_by_mask(READINESS)); 336 let ready = readiness.intersection(interest); 337 338 // if events are ready, change status to done 339 if !ready.is_empty() { 340 let tick = status_bit.get_by_mask(DRIVER_TICK) as u8; 341 *state = State::Done; 342 return Poll::Ready(Ok(ReadyEvent::new(tick, ready))); 343 } 344 345 let mut waiters = schedule_io.waiters.lock().unwrap(); 346 347 let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst)); 348 let mut readiness = Ready::from_usize(status_bit.get_by_mask(READINESS)); 349 350 if waiters.is_shutdown { 351 readiness = Ready::ALL; 352 } 353 354 let ready = readiness.intersection(interest); 355 356 // check one more time to see if events are ready 357 if !ready.is_empty() { 358 let tick = status_bit.get_by_mask(DRIVER_TICK) as u8; 359 *state = State::Done; 360 return Poll::Ready(Ok(ReadyEvent::new(tick, ready))); 361 } 362 363 unsafe { 364 (*waiter.get()).waker = Some(cx.waker().clone()); 365 366 waiters 367 .list 368 .push_front(NonNull::new_unchecked(waiter.get())); 369 } 370 371 *state = State::Waiting; 372 } 373 State::Waiting => { 374 // waiters could also be accessed in other places, so get the lock 375 let waiters = schedule_io.waiters.lock().unwrap(); 376 377 let waiter = unsafe { &mut *waiter.get() }; 378 if waiter.is_ready { 379 *state = State::Done; 380 } else { 381 if !waiter.waker.as_ref().unwrap().will_wake(cx.waker()) { 382 waiter.waker = Some(cx.waker().clone()); 383 } 384 return Poll::Pending; 385 } 386 drop(waiters); 387 } 388 State::Done => { 389 let status_bit = Bit::from_usize(schedule_io.status.load(Acquire)); 390 return Poll::Ready(Ok(ReadyEvent::new( 391 status_bit.get_by_mask(DRIVER_TICK) as u8, 392 Ready::from_interest(interest), 393 ))); 394 } 395 } 396 } 397 } 398 } 399 400 unsafe impl Sync for Readiness<'_> {} 401 unsafe impl Send for Readiness<'_> {} 402 403 impl Drop for Readiness<'_> { drop(&mut self)404 fn drop(&mut self) { 405 let mut waiters = self.schedule_io.waiters.lock().unwrap(); 406 // Safety: There is only one queue holding the node, and this is the only way 407 // for the node to dequeue. 408 unsafe { 409 waiters 410 .list 411 .remove(NonNull::new_unchecked(self.waiter.get())); 412 } 413 } 414 } 415 416 #[cfg(test)] 417 mod schedule_io_test { 418 use std::io; 419 use std::sync::atomic::Ordering::{Acquire, Release}; 420 421 use crate::net::{Ready, ReadyEvent, ScheduleIO, Tick}; 422 use crate::util::slab::Entry; 423 424 /// UT test cases for schedule_io defalut 425 /// 426 /// # Brief 427 /// 1. Call default 428 /// 2. Verify the returned results 429 #[test] ut_schedule_io_default()430 fn ut_schedule_io_default() { 431 let mut schedule_io = ScheduleIO::default(); 432 let status = schedule_io.status.load(Acquire); 433 assert_eq!(status, 0); 434 let is_shutdown = schedule_io.waiters.get_mut().unwrap().is_shutdown; 435 assert!(!is_shutdown); 436 } 437 438 /// UT test cases for schedule_io reset 439 /// 440 /// # Brief 441 /// 1. Create a ScheduleIO 442 /// 2. Call reset 443 /// 3. Verify the returned results 444 #[test] ut_schedule_io_reset()445 fn ut_schedule_io_reset() { 446 let schedule_io = ScheduleIO::default(); 447 let pre_status = schedule_io.status.load(Acquire); 448 assert_eq!(pre_status, 0x00); 449 schedule_io.reset(); 450 let after_status = schedule_io.status.load(Acquire); 451 assert_eq!(after_status, 0x1000000); 452 } 453 454 /// UT test cases for schedule_io generation 455 /// 456 /// # Brief 457 /// 1. Create a ScheduleIO 458 /// 2. Call generation 459 /// 3. Verify the returned results 460 #[test] ut_schedule_io_generation()461 fn ut_schedule_io_generation() { 462 let schedule_io = ScheduleIO::default(); 463 schedule_io.status.store(0x7f000000, Release); 464 assert_eq!(schedule_io.generation(), 0x7f); 465 } 466 467 /// UT test cases for schedule_io shutdown 468 /// 469 /// # Brief 470 /// 1. Create a ScheduleIO 471 /// 2. Call shutdown 472 /// 3. Verify the returned results 473 #[test] ut_schedule_io_shutdown()474 fn ut_schedule_io_shutdown() { 475 let mut schedule_io = ScheduleIO::default(); 476 schedule_io.shutdown(); 477 assert!(schedule_io.waiters.get_mut().unwrap().is_shutdown); 478 } 479 480 /// UT test cases for schedule_io clear_readiness 481 /// 482 /// # Brief 483 /// 1. Create a ScheduleIO 484 /// 2. Call clear_readiness 485 /// 3. Verify the returned results 486 #[test] ut_schedule_io_clear_readiness()487 fn ut_schedule_io_clear_readiness() { 488 let schedule_io = ScheduleIO::default(); 489 schedule_io.status.store(0x0000000f, Release); 490 schedule_io.clear_readiness(ReadyEvent::new(0, Ready::from_usize(0x1))); 491 let status = schedule_io.status.load(Acquire); 492 assert_eq!(status, 0x0000000e); 493 } 494 495 /// UT test cases for schedule_io set_readiness 496 /// 497 /// # Brief 498 /// 1. Create a ScheduleIO 499 /// 2. Call set_readiness 500 /// 3. Verify the returned results 501 #[test] ut_schedule_io_set_readiness()502 fn ut_schedule_io_set_readiness() { 503 ut_schedule_io_set_readiness_01(); 504 ut_schedule_io_set_readiness_02(); 505 ut_schedule_io_set_readiness_03(); 506 507 fn ut_schedule_io_set_readiness_01() { 508 let schedule_io = ScheduleIO::default(); 509 let token = 0x7f000000usize; 510 let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr); 511 let err = ret.err().unwrap(); 512 assert_eq!(err.kind(), io::ErrorKind::Other); 513 assert_eq!( 514 format! {"{}", err.into_inner().unwrap()}, 515 "Token no longer valid." 516 ); 517 } 518 519 fn ut_schedule_io_set_readiness_02() { 520 let schedule_io = ScheduleIO::default(); 521 let token = 0x00000000usize; 522 let ret = schedule_io.set_readiness(Some(token), Tick::Clear(1), |curr| curr); 523 let err = ret.err().unwrap(); 524 assert_eq!(err.kind(), io::ErrorKind::Other); 525 assert_eq!( 526 format! {"{}", err.into_inner().unwrap()}, 527 "Readiness has been covered." 528 ); 529 } 530 531 fn ut_schedule_io_set_readiness_03() { 532 let schedule_io = ScheduleIO::default(); 533 let token = 0x00000000usize; 534 let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr); 535 assert!(ret.is_ok()); 536 let status = schedule_io.status.load(Acquire); 537 assert_eq!(status, 0x00010000); 538 } 539 } 540 } 541