1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::sync::atomic::AtomicUsize; 15 use std::sync::atomic::Ordering::SeqCst; 16 use std::sync::{Arc, Condvar, Mutex}; 17 use std::thread; 18 use std::time::Duration; 19 20 use crate::executor::driver::{Driver, Handle, ParkFlag}; 21 22 #[derive(Clone)] 23 pub(crate) struct Parker { 24 inner: Arc<Inner>, 25 } 26 27 struct Inner { 28 state: AtomicUsize, 29 mutex: Mutex<()>, 30 condvar: Condvar, 31 driver: Arc<Mutex<Driver>>, 32 } 33 34 const IDLE: usize = 0; 35 const PARKED_ON_CONDVAR: usize = 1; 36 const PARKED_ON_DRIVER: usize = 2; 37 const NOTIFIED: usize = 3; 38 39 impl Parker { new(driver: Arc<Mutex<Driver>>) -> Parker40 pub(crate) fn new(driver: Arc<Mutex<Driver>>) -> Parker { 41 Parker { 42 inner: Arc::new(Inner { 43 state: AtomicUsize::new(IDLE), 44 mutex: Mutex::new(()), 45 condvar: Condvar::new(), 46 driver, 47 }), 48 } 49 } 50 park(&mut self)51 pub(crate) fn park(&mut self) { 52 self.inner.park() 53 } 54 unpark(&self, handle: Arc<Handle>)55 pub(crate) fn unpark(&self, handle: Arc<Handle>) { 56 self.inner.unpark(handle); 57 } 58 get_driver(&self) -> &Arc<Mutex<Driver>>59 pub(crate) fn get_driver(&self) -> &Arc<Mutex<Driver>> { 60 self.inner.get_driver() 61 } 62 release(&self)63 pub(crate) fn release(&self) { 64 self.inner.release(); 65 } 66 67 #[cfg(feature = "metrics")] get_state(&self) -> usize68 pub(crate) fn get_state(&self) -> usize { 69 self.inner.get_state() 70 } 71 } 72 73 impl Inner { park(&self)74 fn park(&self) { 75 // loop to reduce the chance of parking the thread 76 for _ in 0..3 { 77 if self 78 .state 79 .compare_exchange(NOTIFIED, IDLE, SeqCst, SeqCst) 80 .is_ok() 81 { 82 return; 83 } 84 thread::yield_now(); 85 } 86 87 let mut park_flag = ParkFlag::Park; 88 if let Ok(mut driver) = self.driver.try_lock() { 89 park_flag = self.park_on_driver(&mut driver); 90 } 91 92 match park_flag { 93 ParkFlag::NotPark => {} 94 ParkFlag::Park => self.park_on_condvar_timeout(None), 95 ParkFlag::ParkTimeout(duration) => self.park_on_condvar_timeout(Some(duration)), 96 } 97 } 98 park_on_driver(&self, driver: &mut Driver) -> ParkFlag99 fn park_on_driver(&self, driver: &mut Driver) -> ParkFlag { 100 match self 101 .state 102 .compare_exchange(IDLE, PARKED_ON_DRIVER, SeqCst, SeqCst) 103 { 104 Ok(_) => {} 105 Err(NOTIFIED) => { 106 self.state.swap(IDLE, SeqCst); 107 return ParkFlag::NotPark; 108 } 109 Err(actual) => panic!("inconsistent park state; actual = {actual}"), 110 } 111 112 let park_flag = driver.run(); 113 114 match self.state.swap(IDLE, SeqCst) { 115 // got notified by real io events or not 116 NOTIFIED => ParkFlag::NotPark, 117 PARKED_ON_DRIVER => park_flag, 118 n => panic!("inconsistent park_timeout state: {n}"), 119 } 120 } 121 122 // if duration is none, than park permanently park_on_condvar_timeout(&self, duration: Option<Duration>)123 fn park_on_condvar_timeout(&self, duration: Option<Duration>) { 124 let mut l = self.mutex.lock().unwrap(); 125 match self 126 .state 127 .compare_exchange(IDLE, PARKED_ON_CONDVAR, SeqCst, SeqCst) 128 { 129 Ok(_) => {} 130 Err(NOTIFIED) => { 131 // got a notification, exit parking 132 self.state.swap(IDLE, SeqCst); 133 return; 134 } 135 Err(actual) => panic!("inconsistent park state; actual = {actual}"), 136 } 137 138 loop { 139 let mut is_timed_out = false; 140 if let Some(duration) = duration { 141 let (lock, timeout_result) = self.condvar.wait_timeout(l, duration).unwrap(); 142 is_timed_out = timeout_result.timed_out(); 143 l = lock; 144 } else { 145 l = self.condvar.wait(l).unwrap(); 146 } 147 148 if self 149 .state 150 .compare_exchange(NOTIFIED, IDLE, SeqCst, SeqCst) 151 .is_ok() 152 { 153 // got a notification, finish parking 154 return; 155 } 156 157 if is_timed_out { 158 self.state.store(IDLE, SeqCst); 159 return; 160 } 161 // got spurious wakeup, go back to park again 162 } 163 } 164 unpark(&self, handle: Arc<Handle>)165 fn unpark(&self, handle: Arc<Handle>) { 166 match self.state.swap(NOTIFIED, SeqCst) { 167 IDLE | NOTIFIED => {} 168 PARKED_ON_CONDVAR => { 169 drop(self.mutex.lock()); 170 self.condvar.notify_one(); 171 } 172 PARKED_ON_DRIVER => handle.wake(), 173 actual => panic!("inconsistent state in unpark; actual = {actual}"), 174 } 175 } 176 get_driver(&self) -> &Arc<Mutex<Driver>>177 pub(crate) fn get_driver(&self) -> &Arc<Mutex<Driver>> { 178 &self.driver 179 } 180 181 #[cfg(feature = "metrics")] get_state(&self) -> usize182 pub(crate) fn get_state(&self) -> usize { 183 self.state.load(SeqCst) 184 } 185 release(&self)186 fn release(&self) { 187 self.condvar.notify_all(); 188 } 189 } 190