• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering::SeqCst;
16 use std::sync::{Arc, Condvar, Mutex};
17 use std::thread;
18 use std::time::Duration;
19 
20 use crate::executor::driver::{Driver, Handle, ParkFlag};
21 
22 #[derive(Clone)]
23 pub(crate) struct Parker {
24     inner: Arc<Inner>,
25 }
26 
27 struct Inner {
28     state: AtomicUsize,
29     mutex: Mutex<()>,
30     condvar: Condvar,
31     driver: Arc<Mutex<Driver>>,
32 }
33 
34 const IDLE: usize = 0;
35 const PARKED_ON_CONDVAR: usize = 1;
36 const PARKED_ON_DRIVER: usize = 2;
37 const NOTIFIED: usize = 3;
38 
39 impl Parker {
new(driver: Arc<Mutex<Driver>>) -> Parker40     pub(crate) fn new(driver: Arc<Mutex<Driver>>) -> Parker {
41         Parker {
42             inner: Arc::new(Inner {
43                 state: AtomicUsize::new(IDLE),
44                 mutex: Mutex::new(()),
45                 condvar: Condvar::new(),
46                 driver,
47             }),
48         }
49     }
50 
park(&mut self)51     pub(crate) fn park(&mut self) {
52         self.inner.park()
53     }
54 
unpark(&self, handle: Arc<Handle>)55     pub(crate) fn unpark(&self, handle: Arc<Handle>) {
56         self.inner.unpark(handle);
57     }
58 
get_driver(&self) -> &Arc<Mutex<Driver>>59     pub(crate) fn get_driver(&self) -> &Arc<Mutex<Driver>> {
60         self.inner.get_driver()
61     }
62 
release(&self)63     pub(crate) fn release(&self) {
64         self.inner.release();
65     }
66 
67     #[cfg(feature = "metrics")]
get_state(&self) -> usize68     pub(crate) fn get_state(&self) -> usize {
69         self.inner.get_state()
70     }
71 }
72 
73 impl Inner {
park(&self)74     fn park(&self) {
75         // loop to reduce the chance of parking the thread
76         for _ in 0..3 {
77             if self
78                 .state
79                 .compare_exchange(NOTIFIED, IDLE, SeqCst, SeqCst)
80                 .is_ok()
81             {
82                 return;
83             }
84             thread::yield_now();
85         }
86 
87         let mut park_flag = ParkFlag::Park;
88         if let Ok(mut driver) = self.driver.try_lock() {
89             park_flag = self.park_on_driver(&mut driver);
90         }
91 
92         match park_flag {
93             ParkFlag::NotPark => {}
94             ParkFlag::Park => self.park_on_condvar_timeout(None),
95             ParkFlag::ParkTimeout(duration) => self.park_on_condvar_timeout(Some(duration)),
96         }
97     }
98 
park_on_driver(&self, driver: &mut Driver) -> ParkFlag99     fn park_on_driver(&self, driver: &mut Driver) -> ParkFlag {
100         match self
101             .state
102             .compare_exchange(IDLE, PARKED_ON_DRIVER, SeqCst, SeqCst)
103         {
104             Ok(_) => {}
105             Err(NOTIFIED) => {
106                 self.state.swap(IDLE, SeqCst);
107                 return ParkFlag::NotPark;
108             }
109             Err(actual) => panic!("inconsistent park state; actual = {actual}"),
110         }
111 
112         let park_flag = driver.run();
113 
114         match self.state.swap(IDLE, SeqCst) {
115             // got notified by real io events or not
116             NOTIFIED => ParkFlag::NotPark,
117             PARKED_ON_DRIVER => park_flag,
118             n => panic!("inconsistent park_timeout state: {n}"),
119         }
120     }
121 
122     // if duration is none, than park permanently
park_on_condvar_timeout(&self, duration: Option<Duration>)123     fn park_on_condvar_timeout(&self, duration: Option<Duration>) {
124         let mut l = self.mutex.lock().unwrap();
125         match self
126             .state
127             .compare_exchange(IDLE, PARKED_ON_CONDVAR, SeqCst, SeqCst)
128         {
129             Ok(_) => {}
130             Err(NOTIFIED) => {
131                 // got a notification, exit parking
132                 self.state.swap(IDLE, SeqCst);
133                 return;
134             }
135             Err(actual) => panic!("inconsistent park state; actual = {actual}"),
136         }
137 
138         loop {
139             let mut is_timed_out = false;
140             if let Some(duration) = duration {
141                 let (lock, timeout_result) = self.condvar.wait_timeout(l, duration).unwrap();
142                 is_timed_out = timeout_result.timed_out();
143                 l = lock;
144             } else {
145                 l = self.condvar.wait(l).unwrap();
146             }
147 
148             if self
149                 .state
150                 .compare_exchange(NOTIFIED, IDLE, SeqCst, SeqCst)
151                 .is_ok()
152             {
153                 // got a notification, finish parking
154                 return;
155             }
156 
157             if is_timed_out {
158                 self.state.store(IDLE, SeqCst);
159                 return;
160             }
161             // got spurious wakeup, go back to park again
162         }
163     }
164 
unpark(&self, handle: Arc<Handle>)165     fn unpark(&self, handle: Arc<Handle>) {
166         match self.state.swap(NOTIFIED, SeqCst) {
167             IDLE | NOTIFIED => {}
168             PARKED_ON_CONDVAR => {
169                 drop(self.mutex.lock());
170                 self.condvar.notify_one();
171             }
172             PARKED_ON_DRIVER => handle.wake(),
173             actual => panic!("inconsistent state in unpark; actual = {actual}"),
174         }
175     }
176 
get_driver(&self) -> &Arc<Mutex<Driver>>177     pub(crate) fn get_driver(&self) -> &Arc<Mutex<Driver>> {
178         &self.driver
179     }
180 
181     #[cfg(feature = "metrics")]
get_state(&self) -> usize182     pub(crate) fn get_state(&self) -> usize {
183         self.state.load(SeqCst)
184     }
185 
release(&self)186     fn release(&self) {
187         self.condvar.notify_all();
188     }
189 }
190