1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! An asynchronous version of [`std::sync::RwLock`] 15 16 use std::cell::UnsafeCell; 17 use std::fmt; 18 use std::ops::{Deref, DerefMut}; 19 use std::sync::atomic::AtomicI64; 20 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; 21 22 use crate::sync::semaphore_inner::SemaphoreInner; 23 use crate::sync::LockError; 24 25 const MAX_READS: i64 = i64::MAX >> 2; 26 27 /// An asynchronous version of [`std::sync::RwLock`]. 28 /// 29 /// Rwlock allows multiple readers or a single writer to operate concurrently. 30 /// Readers are only allowed to read the data, but the writer is the only one 31 /// can change the data inside. 32 /// 33 /// This Rwlock's policy is writer first, to prevent writers from starving. 34 /// 35 /// # Examples 36 /// 37 /// ``` 38 /// use ylong_runtime::sync::rwlock::RwLock; 39 /// 40 /// ylong_runtime::block_on(async { 41 /// let lock = RwLock::new(0); 42 /// 43 /// // Can have multiple read locks at the same time 44 /// let r1 = lock.read().await; 45 /// let r2 = lock.read().await; 46 /// assert_eq!(*r1, 0); 47 /// assert_eq!(*r2, 0); 48 /// drop((r1, r2)); 49 /// 50 /// // Only one write lock at a time 51 /// let mut w = lock.write().await; 52 /// *w += 1; 53 /// assert_eq!(*w, 1); 54 /// }); 55 /// ``` 56 pub struct RwLock<T: ?Sized> { 57 read_sem: SemaphoreInner, 58 write_sem: SemaphoreInner, 59 write_mutex: SemaphoreInner, 60 read_count: AtomicI64, 61 read_wait: AtomicI64, 62 data: UnsafeCell<T>, 63 } 64 65 unsafe impl<T: ?Sized + Send> Send for RwLock<T> {} 66 unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {} 67 68 impl<T: Sized> RwLock<T> { 69 /// Creates a new RwLock. `T` is the data that needs to be protected 70 /// by this RwLock. 71 /// 72 /// # Examples 73 /// 74 /// ``` 75 /// use ylong_runtime::sync::rwlock::RwLock; 76 /// 77 /// let lock = RwLock::new(0); 78 /// ``` new(t: T) -> RwLock<T>79 pub fn new(t: T) -> RwLock<T> { 80 RwLock { 81 read_sem: SemaphoreInner::new(0).unwrap(), 82 write_sem: SemaphoreInner::new(0).unwrap(), 83 write_mutex: SemaphoreInner::new(1).unwrap(), 84 read_count: AtomicI64::new(0), 85 read_wait: AtomicI64::new(0), 86 data: UnsafeCell::new(t), 87 } 88 } 89 } 90 91 impl<T: ?Sized> RwLock<T> { 92 /// Asynchronously acquires the read lock. 93 /// 94 /// If there is a writer holding the write lock, then this method will wait 95 /// asynchronously for the write lock to get released. 96 /// 97 /// But if the write lock is not held, it's ok for multiple readers to hold 98 /// the read lock concurrently. 99 /// 100 /// 101 /// 102 /// # Examples 103 /// 104 /// ``` 105 /// use ylong_runtime::sync::rwlock::RwLock; 106 /// 107 /// ylong_runtime::block_on(async { 108 /// let lock = RwLock::new(0); 109 /// let r1 = lock.read().await; 110 /// assert_eq!(*r1, 0); 111 /// }); 112 /// ``` read(&self) -> RwLockReadGuard<'_, T>113 pub async fn read(&self) -> RwLockReadGuard<'_, T> { 114 if self.read_count.fetch_add(1, Release) < 0 { 115 // The result of `acquire()` will be `Err()` only when the semaphore is closed. 116 // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`. 117 self.read_sem.acquire().await.unwrap(); 118 } 119 RwLockReadGuard(self) 120 } 121 122 /// Attempts to get the read lock. If another writer is holding the write 123 /// lock, then None will be returned. Otherwise, the ReadMutexGuard will 124 /// be returned. 125 /// 126 /// # Examples 127 /// 128 /// ``` 129 /// use ylong_runtime::sync::rwlock::RwLock; 130 /// 131 /// let lock = RwLock::new(0); 132 /// let r1 = lock.try_read().unwrap(); 133 /// assert_eq!(*r1, 0); 134 /// ``` try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError>135 pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError> { 136 let mut read_count = self.read_count.load(Acquire); 137 loop { 138 if read_count < 0 { 139 return Err(LockError); 140 } else { 141 match self.read_count.compare_exchange_weak( 142 read_count, 143 read_count + 1, 144 AcqRel, 145 Acquire, 146 ) { 147 Ok(_) => { 148 return Ok(RwLockReadGuard(self)); 149 } 150 Err(curr) => { 151 read_count = curr; 152 } 153 } 154 } 155 } 156 } 157 158 /// Asynchronously acquires the write lock. 159 /// 160 /// If there is other readers or writers, then this method will wait 161 /// asynchronously for them to get released. 162 /// 163 /// # Examples 164 /// 165 /// ``` 166 /// use ylong_runtime::sync::rwlock::RwLock; 167 /// 168 /// ylong_runtime::block_on(async { 169 /// let lock = RwLock::new(0); 170 /// let mut r1 = lock.write().await; 171 /// *r1 += 1; 172 /// assert_eq!(*r1, 1); 173 /// }); 174 /// ``` write(&self) -> RwLockWriteGuard<'_, T>175 pub async fn write(&self) -> RwLockWriteGuard<'_, T> { 176 // The result of `acquire()` will be `Err()` only when the semaphore is closed. 177 // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`. 178 self.write_mutex.acquire().await.unwrap(); 179 let read_count = self.read_count.fetch_sub(MAX_READS, Release); 180 // If the `read_count` is not 0, it indicates that there is currently a reader 181 // holding a read lock. If the `read_wait` is 0 after addition, it 182 // indicates that all readers have been dropped. 183 if read_count >= 0 && self.read_wait.fetch_add(read_count, Release) != -read_count { 184 self.write_sem.acquire().await.unwrap(); 185 } 186 RwLockWriteGuard(self) 187 } 188 189 /// Attempts to acquire the write lock. 190 /// 191 /// If any other task holds the read/write lock, None will be returned. 192 /// 193 /// # Examples 194 /// 195 /// ``` 196 /// use ylong_runtime::sync::rwlock::RwLock; 197 /// 198 /// let lock = RwLock::new(0); 199 /// let mut r1 = lock.try_write().unwrap(); 200 /// *r1 += 1; 201 /// assert_eq!(*r1, 1); 202 /// ``` try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError>203 pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError> { 204 if self.write_mutex.try_acquire().is_err() { 205 return Err(LockError); 206 } 207 match self 208 .read_count 209 .compare_exchange(0, -MAX_READS, AcqRel, Acquire) 210 { 211 Ok(_) => Ok(RwLockWriteGuard(self)), 212 Err(_) => { 213 self.write_mutex.release(); 214 Err(LockError) 215 } 216 } 217 } 218 219 /// Consumes the lock, and returns the data protected by it. 220 /// 221 /// # Examples 222 /// 223 /// ``` 224 /// use ylong_runtime::sync::rwlock::RwLock; 225 /// 226 /// let lock = RwLock::new(0); 227 /// assert_eq!(lock.into_inner(), 0); 228 /// ``` into_inner(self) -> T where T: Sized,229 pub fn into_inner(self) -> T 230 where 231 T: Sized, 232 { 233 self.data.into_inner() 234 } 235 236 /// Gets the mutable reference of the data protected by the lock. 237 /// 238 /// This method takes the mutable reference of the RwLock, so there is no 239 /// need to actually lock the RwLock -- the mutable borrow statically 240 /// guarantees no locks exist. 241 /// ``` 242 /// use ylong_runtime::sync::rwlock::RwLock; 243 /// 244 /// ylong_runtime::block_on(async { 245 /// let mut lock = RwLock::new(0); 246 /// *lock.get_mut() = 10; 247 /// assert_eq!(*lock.write().await, 10); 248 /// }); 249 /// ``` get_mut(&mut self) -> &mut T250 pub fn get_mut(&mut self) -> &mut T { 251 unsafe { &mut *self.data.get() } 252 } 253 } 254 255 /// Read guard to access the data after holding the mutex. 256 pub struct RwLockReadGuard<'a, T: ?Sized>(&'a RwLock<T>); 257 258 unsafe impl<T: ?Sized + Send> Send for RwLockReadGuard<'_, T> {} 259 unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {} 260 261 /// Releases the read lock. Wakes any waiting writer if it's the last one 262 /// holding the read lock. 263 impl<T: ?Sized> RwLockReadGuard<'_, T> { unlock(&mut self)264 fn unlock(&mut self) { 265 if self.0.read_count.fetch_sub(1, Release) < 0 266 && self.0.read_wait.fetch_sub(1, Release) == 1 267 { 268 self.0.write_sem.release(); 269 } 270 } 271 } 272 273 /// Unlock the read lock when ReadGuard is dropped. 274 impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> { drop(&mut self)275 fn drop(&mut self) { 276 self.unlock(); 277 } 278 } 279 280 impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> { 281 type Target = T; 282 deref(&self) -> &T283 fn deref(&self) -> &T { 284 unsafe { &*self.0.data.get() } 285 } 286 } 287 288 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 290 fmt::Debug::fmt(&**self, f) 291 } 292 } 293 294 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 296 fmt::Display::fmt(&**self, f) 297 } 298 } 299 300 /// RwLock write guard 301 pub struct RwLockWriteGuard<'a, T: ?Sized>(&'a RwLock<T>); 302 303 unsafe impl<T: ?Sized + Send> Send for RwLockWriteGuard<'_, T> {} 304 unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {} 305 306 /// Wakes all waiting readers first and releases the write lock when WriteGuard 307 /// is dropped. 308 impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> { drop(&mut self)309 fn drop(&mut self) { 310 let read_count = self.0.read_count.fetch_add(MAX_READS, Release) + MAX_READS; 311 self.0.read_sem.release_multi(read_count as usize); 312 self.0.write_mutex.release(); 313 } 314 } 315 316 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result317 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 318 fmt::Debug::fmt(&**self, f) 319 } 320 } 321 322 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 324 fmt::Display::fmt(&**self, f) 325 } 326 } 327 328 impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> { 329 type Target = T; deref(&self) -> &Self::Target330 fn deref(&self) -> &Self::Target { 331 unsafe { &*self.0.data.get() } 332 } 333 } 334 335 impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> { deref_mut(&mut self) -> &mut T336 fn deref_mut(&mut self) -> &mut T { 337 unsafe { &mut *self.0.data.get() } 338 } 339 } 340 341 #[cfg(test)] 342 mod tests { 343 use std::sync::Arc; 344 345 use super::*; 346 use crate::{block_on, spawn}; 347 348 /// UT test cases for Rwlock::new() 349 /// 350 /// # Brief 351 /// 1. Create a concurrent read/write lock with structure and value as input 352 /// parameters 353 /// 2. Verify the contents of the read/write lock 354 #[test] ut_rwlock_new_01()355 fn ut_rwlock_new_01() { 356 pub struct Test { 357 flag: bool, 358 num: usize, 359 } 360 block_on(async { 361 let lock = RwLock::new(Test { flag: true, num: 1 }); 362 assert!(lock.read().await.flag); 363 assert_eq!(lock.read().await.num, 1); 364 let lock2 = RwLock::new(0); 365 assert_eq!(*lock2.read().await, 0); 366 }); 367 } 368 369 /// UT test cases for Rwlock::read() 370 /// 371 /// # Brief 372 /// 1. Creating a concurrent read/write lock 373 /// 2. Calling the read() function 374 /// 3. Verify the value of the read() function dereference 375 #[test] ut_rwlock_read_01()376 fn ut_rwlock_read_01() { 377 block_on(async { 378 let lock = RwLock::new(100); 379 let a = lock.read().await; 380 assert_eq!(*a, 100); 381 }); 382 } 383 384 /// UT test cases for Rwlock::read() 385 /// 386 /// # Brief 387 /// 1. Creating a concurrent read/write lock 388 /// 2. Call the write() function to make changes to the concurrent 389 /// read/write lock data 390 /// 3. Call the read() function to verify the value in the read/write lock 391 /// of the concurrent process 392 #[test] ut_rwlock_read_02()393 fn ut_rwlock_read_02() { 394 let lock = Arc::new(RwLock::new(100)); 395 let lock2 = lock.clone(); 396 397 block_on(spawn(async move { 398 let mut loopmun = lock2.write().await; 399 *loopmun += 1; 400 })) 401 .unwrap(); 402 block_on(async { 403 let a = lock.read().await; 404 assert_eq!(*a, 101); 405 }); 406 } 407 408 /// UT test cases for Rwlock::try_read() 409 /// 410 /// # Brief 411 /// 1. Creating a concurrent read/write lock 412 /// 2. Call try_read() 413 /// 3. Verify the value of the return value dereference 414 #[test] ut_rwlock_try_read_01()415 fn ut_rwlock_try_read_01() { 416 let lock = RwLock::new(100); 417 let res = lock.try_read().unwrap(); 418 assert_eq!(*res, 100); 419 } 420 421 /// UT test cases for Rwlock::try_read() 422 /// 423 /// # Brief 424 /// 1. Creating a concurrent read/write lock 425 /// 2. Create a thread to call the write method to hold the lock, and then 426 /// sleep to hold the lock for a long time 427 /// 3. Call try_read() to try to get a lock 428 /// 4. Check the try_read return value 429 #[test] ut_rwlock_try_read_02()430 fn ut_rwlock_try_read_02() { 431 let lock = Arc::new(RwLock::new(100)); 432 let mut a = lock.try_write().unwrap(); 433 *a += 1; 434 let res = lock.try_read(); 435 assert!(res.is_err()); 436 *a += 1; 437 drop(a); 438 let res2 = lock.try_read(); 439 assert!(res2.is_ok()); 440 } 441 442 /// UT test cases for Rwlock::write() 443 /// 444 /// # Brief 445 /// 1. Creating a concurrent read/write lock 446 /// 2. Create a call to the write interface to modify the value inside the 447 /// concurrent read/write lock 448 /// 3. Verify the value of the concurrent read/write lock 449 #[test] ut_rwlock_write_01()450 fn ut_rwlock_write_01() { 451 let lock = Arc::new(RwLock::new(100)); 452 block_on(async { 453 let mut a = lock.write().await; 454 *a += 100; 455 assert_eq!(*a, 200); 456 }); 457 } 458 459 /// UT test cases for Rwlock::write() 460 /// 461 /// # Brief 462 /// 1. Creating a concurrent read/write lock 463 /// 2. First create a thread to obtain a write lock, modify the data in the 464 /// concurrent read/write lock, and then hibernate to ensure that the 465 /// lock is held for a long time 466 /// 3. Create two co-processes one to get a read lock and one to get a write 467 /// lock, so that there is both a reader and a writer requesting the lock 468 /// 4. Verify the value inside the concurrent read/write lock when the 469 /// concurrent read/write lock is obtained 470 #[test] ut_rwlock_write_test_02()471 fn ut_rwlock_write_test_02() { 472 let lock = Arc::new(RwLock::new(100)); 473 let lock2 = lock.clone(); 474 let lock3 = lock.clone(); 475 let lock4 = lock; 476 477 let handle = spawn(async move { 478 let mut aa = lock2.write().await; 479 *aa += 100; 480 }); 481 let handle1 = spawn(async move { 482 let mut aa = lock4.write().await; 483 *aa += 100; 484 }); 485 block_on(handle).unwrap(); 486 block_on(handle1).unwrap(); 487 let handle2 = spawn(async move { 488 let aa = lock3.read().await; 489 assert_eq!(*aa, 300); 490 }); 491 block_on(handle2).unwrap(); 492 } 493 494 /// UT test cases for Rwlock::try_write() 495 /// 496 /// # Brief 497 /// 1. Creating a concurrent read/write lock 498 /// 2. Call try_write() to try to get a write lock and modify the value in 499 /// it 500 /// 3. Verify the value in the read/write lock of the concurrent process 501 #[test] ut_rwlock_try_write_01()502 fn ut_rwlock_try_write_01() { 503 let lock = RwLock::new(100); 504 let mut aa = lock.try_write().unwrap(); 505 *aa += 100; 506 assert_eq!(*aa, 200); 507 } 508 509 /// UT test cases for Rwlock::try_write() 510 /// 511 /// # Brief 512 /// 1. Creating a concurrent read/write lock 513 /// 2. Execute command cargo test ut_rwlock_try_write_02 514 #[test] ut_rwlock_try_write_02()515 fn ut_rwlock_try_write_02() { 516 let lock = Arc::new(RwLock::new(100)); 517 let mut a = lock.try_write().unwrap(); 518 *a += 1; 519 let res = lock.try_write(); 520 assert!(res.is_err()); 521 *a += 1; 522 drop(a); 523 let res2 = lock.try_write(); 524 assert!(res2.is_ok()); 525 } 526 527 /// UT test cases for Rwlock::into_inner() 528 /// 529 /// # Brief 530 /// 1. Add a temporary library path to the project directory export 531 /// LD_LIBRARY_PATH=$(pwd)/platform 532 /// 2. Execute command cargo test ut_rwlock_into_inner_01 533 #[test] ut_rwlock_into_inner_01()534 fn ut_rwlock_into_inner_01() { 535 let lock = RwLock::new(10); 536 assert_eq!(lock.into_inner(), 10); 537 } 538 } 539