1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Mutual exclusion locks 15 16 use std::cell::UnsafeCell; 17 use std::error::Error; 18 use std::fmt; 19 use std::fmt::{Display, Formatter}; 20 use std::ops::{Deref, DerefMut}; 21 22 use crate::sync::semaphore_inner::SemaphoreInner; 23 24 /// An async version of [`std::sync::Mutex`] 25 /// 26 /// Often it's considered as normal to use [`std::sync::Mutex`] on an 27 /// asynchronous environment. The primal purpose of this async mutex is to 28 /// protect shared reference of io, which contains a lot await point during 29 /// reading and writing. If you only wants to protect a data across 30 /// different threads, [`std::sync::Mutex`] will probably gain you better 31 /// performance. 32 /// 33 /// When using across different futures, users need to wrap the mutex inside an 34 /// Arc, just like the use of [`std::sync::Mutex`]. 35 pub struct Mutex<T: ?Sized> { 36 /// Semaphore to provide mutual exclusion 37 sem: SemaphoreInner, 38 /// The data protected by this mutex 39 data: UnsafeCell<T>, 40 } 41 42 unsafe impl<T: ?Sized + Send> Send for Mutex<T> {} 43 unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {} 44 45 /// Error of Mutex 46 #[derive(Debug, Eq, PartialEq, Clone)] 47 pub struct LockError; 48 49 impl Display for LockError { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result50 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 51 write!(f, "Try lock error.") 52 } 53 } 54 55 impl Error for LockError {} 56 57 impl<T: Sized> Mutex<T> { 58 /// Creates a mutex protecting the data passed in. 59 /// 60 /// # Examples 61 /// 62 /// ``` 63 /// use ylong_runtime::sync::mutex::Mutex; 64 /// 65 /// let _a = Mutex::new(2); 66 /// ``` new(t: T) -> Mutex<T>67 pub fn new(t: T) -> Mutex<T> { 68 Mutex { 69 sem: SemaphoreInner::new(1).unwrap(), 70 data: UnsafeCell::new(t), 71 } 72 } 73 } 74 75 impl<T: ?Sized> Mutex<T> { 76 /// Locks the mutex. 77 /// 78 /// If the mutex is already held by others, asynchronously waits for it to 79 /// release. 80 /// 81 /// # Examples 82 /// 83 /// ``` 84 /// use std::sync::Arc; 85 /// 86 /// use ylong_runtime::sync::mutex::Mutex; 87 /// 88 /// let _res = ylong_runtime::block_on(async { 89 /// let lock = Arc::new(Mutex::new(2)); 90 /// let mut n = lock.lock().await; 91 /// *n += 1; 92 /// assert_eq!(*n, 3); 93 /// }); 94 /// ``` lock(&self) -> MutexGuard<'_, T>95 pub async fn lock(&self) -> MutexGuard<'_, T> { 96 // The result of `acquire()` will be `Err()` only when the semaphore is closed. 97 // `Mutex` will not close, so the result of `acquire()` must be `Ok(())`. 98 self.sem.acquire().await.unwrap(); 99 MutexGuard(self) 100 } 101 102 /// Attempts to get the mutex. 103 /// 104 /// If the lock is already held by others, LockError will be returned. 105 /// Otherwise, the mutex guard will be returned. 106 /// 107 /// # Examples 108 /// 109 /// ``` 110 /// use std::sync::Arc; 111 /// 112 /// use ylong_runtime::sync::mutex::Mutex; 113 /// 114 /// let _res = ylong_runtime::block_on(async { 115 /// let mutex = Arc::new(Mutex::new(0)); 116 /// match mutex.try_lock() { 117 /// Ok(lock) => println!("{}", lock), 118 /// Err(_) => {} 119 /// }; 120 /// }); 121 /// ``` try_lock(&self) -> Result<MutexGuard<'_, T>, LockError>122 pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, LockError> { 123 match self.sem.try_acquire() { 124 Ok(_) => Ok(MutexGuard(self)), 125 Err(_) => Err(LockError), 126 } 127 } 128 129 /// Gets the mutable reference of the data protected by the lock without 130 /// actually holding the lock. 131 /// 132 /// This method takes the mutable reference of the mutex, so there is no 133 /// need to actually lock the mutex -- the mutable borrow statically 134 /// guarantees no locks exist. get_mut(&mut self) -> &mut T135 pub fn get_mut(&mut self) -> &mut T { 136 unsafe { &mut *self.data.get() } 137 } 138 } 139 140 /// Mutex guard to access the data after holding the mutex. 141 pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>); 142 143 impl<T: ?Sized> MutexGuard<'_, T> { 144 // Unlocks the mutex. Wakes the first future waiting for the mutex. unlock(&mut self)145 fn unlock(&mut self) { 146 self.0.sem.release(); 147 } 148 } 149 150 unsafe impl<T: ?Sized + Send + Sync> Sync for MutexGuard<'_, T> {} 151 152 /// The mutex will be released after the mutex guard is dropped. 153 impl<T: ?Sized> Drop for MutexGuard<'_, T> { drop(&mut self)154 fn drop(&mut self) { 155 self.unlock(); 156 } 157 } 158 159 impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result160 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 161 fmt::Debug::fmt(&**self, f) 162 } 163 } 164 165 impl<T: ?Sized + Display> Display for MutexGuard<'_, T> { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result166 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 167 Display::fmt(&**self, f) 168 } 169 } 170 171 impl<T: ?Sized> Deref for MutexGuard<'_, T> { 172 type Target = T; deref(&self) -> &Self::Target173 fn deref(&self) -> &Self::Target { 174 unsafe { &*self.0.data.get() } 175 } 176 } 177 178 impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { deref_mut(&mut self) -> &mut T179 fn deref_mut(&mut self) -> &mut T { 180 unsafe { &mut *self.0.data.get() } 181 } 182 } 183 184 #[cfg(test)] 185 mod tests { 186 use std::sync::atomic::{AtomicBool, Ordering}; 187 use std::sync::Arc; 188 use std::thread; 189 use std::time::Duration; 190 191 use super::*; 192 use crate::{block_on, spawn}; 193 194 /// UT test cases for Mutex::new() interface 195 /// 196 /// # Brief 197 /// 1. Creating a Concurrent Mutual Exclusion Lock 198 /// 2. Verify the state of the lock object and the size of the internal 199 /// value of the lock object 200 #[test] ut_mutex_new_01()201 fn ut_mutex_new_01() { 202 let lock = Mutex::new(10); 203 assert_eq!(lock.data.into_inner(), 10); 204 } 205 206 /// UT test cases for Mutex::lock() interface 207 /// 208 /// # Brief 209 /// 1. Creating a concurrent read/write lock 210 /// 2. Modification of data in the concurrent mutex lock 211 /// 3. Check the value in the changed concurrent mutex lock and check the 212 /// status bit of the lock 213 #[test] ut_mutex_lock_01()214 fn ut_mutex_lock_01() { 215 let mutex = Mutex::new(10); 216 block_on(async { 217 let mut lock = mutex.lock().await; 218 *lock += 1; 219 assert_eq!(*lock, 11); 220 }); 221 } 222 223 /// UT test cases for Mutex::try_lock() interface 224 /// 225 /// # Brief 226 /// 1. Creating a Concurrent Mutual Exclusion Lock 227 /// 2. Call try_lock() to try to get the lock 228 /// 3. Operation on in-lock values 229 /// 4. Calibrate in-lock values 230 #[test] ut_mutex_try_lock_01()231 fn ut_mutex_try_lock_01() { 232 let mutex = Mutex::new(10); 233 let mut lock = mutex.try_lock().unwrap(); 234 *lock += 100; 235 assert_eq!(*lock, 110); 236 } 237 238 /// UT test cases for Mutex::try_lock() interface 239 /// 240 /// # Brief 241 /// 1. Creating a Concurrent Mutual Exclusion Lock 242 /// 2. First build a concurrent process to hold the lock and sleep after 243 /// obtaining the lock to hold the lock for a long time 244 /// 3. Call try_lock() to try to get a lock 245 /// 4. Check try_lock return value is None 246 #[test] ut_mutex_try_lock_02()247 fn ut_mutex_try_lock_02() { 248 let mutex = Arc::new(Mutex::new(10)); 249 let mutex1 = mutex.clone(); 250 let flag = Arc::new(AtomicBool::new(true)); 251 let flag_clone = flag.clone(); 252 let lock_flag = Arc::new(AtomicBool::new(true)); 253 let lock_flag_clone = lock_flag.clone(); 254 255 spawn(async move { 256 let _lock = mutex1.lock().await; 257 lock_flag_clone.store(false, Ordering::SeqCst); 258 while flag_clone.load(Ordering::SeqCst) { 259 thread::sleep(Duration::from_millis(1)); 260 } 261 }); 262 while lock_flag.load(Ordering::SeqCst) { 263 thread::sleep(Duration::from_millis(1)); 264 } 265 let lock2 = mutex.try_lock(); 266 assert!(lock2.is_err()); 267 flag.store(false, Ordering::SeqCst); 268 } 269 270 /// UT test cases for Mutex::unlock() interface 271 /// 272 /// # Brief 273 /// 1. Creating a Concurrent Mutual Exclusion Lock 274 /// 2. Perform locking operation 275 /// 3. Call the drop() method to release the lock 276 /// 4. Verify the status of the concurrent mutex lock before, after and 277 /// after unlocking 278 #[test] ut_mutex_unlock_01()279 fn ut_mutex_unlock_01() { 280 let mutex = Mutex::new(10); 281 block_on(async move { 282 let lock = mutex.lock().await; 283 assert!(mutex.try_lock().is_err()); 284 drop(lock); 285 assert!(mutex.try_lock().is_ok()); 286 }); 287 } 288 } 289