• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! An asynchronous version of [`std::sync::RwLock`]
15 
16 use std::cell::UnsafeCell;
17 use std::fmt;
18 use std::ops::{Deref, DerefMut};
19 use std::sync::atomic::AtomicI64;
20 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
21 
22 use crate::sync::semaphore_inner::SemaphoreInner;
23 use crate::sync::LockError;
24 
25 const MAX_READS: i64 = i64::MAX >> 2;
26 
27 /// An asynchronous version of [`std::sync::RwLock`].
28 ///
29 /// Rwlock allows multiple readers or a single writer to operate concurrently.
30 /// Readers are only allowed to read the data, but the writer is the only one
31 /// can change the data inside.
32 ///
33 /// This Rwlock's policy is writer first, to prevent writers from starving.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use ylong_runtime::sync::rwlock::RwLock;
39 ///
40 /// ylong_runtime::block_on(async {
41 ///     let lock = RwLock::new(0);
42 ///
43 ///     // Can have multiple read locks at the same time
44 ///     let r1 = lock.read().await;
45 ///     let r2 = lock.read().await;
46 ///     assert_eq!(*r1, 0);
47 ///     assert_eq!(*r2, 0);
48 ///     drop((r1, r2));
49 ///
50 ///     // Only one write lock at a time
51 ///     let mut w = lock.write().await;
52 ///     *w += 1;
53 ///     assert_eq!(*w, 1);
54 /// });
55 /// ```
56 pub struct RwLock<T: ?Sized> {
57     read_sem: SemaphoreInner,
58     write_sem: SemaphoreInner,
59     write_mutex: SemaphoreInner,
60     read_count: AtomicI64,
61     read_wait: AtomicI64,
62     data: UnsafeCell<T>,
63 }
64 
65 unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
66 unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
67 
68 impl<T: Sized> RwLock<T> {
69     /// Creates a new RwLock. `T` is the data that needs to be protected
70     /// by this RwLock.
71     ///
72     /// # Examples
73     ///
74     /// ```
75     /// use ylong_runtime::sync::rwlock::RwLock;
76     ///
77     /// let lock = RwLock::new(0);
78     /// ```
new(t: T) -> RwLock<T>79     pub fn new(t: T) -> RwLock<T> {
80         RwLock {
81             read_sem: SemaphoreInner::new(0).unwrap(),
82             write_sem: SemaphoreInner::new(0).unwrap(),
83             write_mutex: SemaphoreInner::new(1).unwrap(),
84             read_count: AtomicI64::new(0),
85             read_wait: AtomicI64::new(0),
86             data: UnsafeCell::new(t),
87         }
88     }
89 }
90 
91 impl<T: ?Sized> RwLock<T> {
92     /// Asynchronously acquires the read lock.
93     ///
94     /// If there is a writer holding the write lock, then this method will wait
95     /// asynchronously for the write lock to get released.
96     ///
97     /// But if the write lock is not held, it's ok for multiple readers to hold
98     /// the read lock concurrently.
99     ///
100     ///
101     ///
102     /// # Examples
103     ///
104     /// ```
105     /// use ylong_runtime::sync::rwlock::RwLock;
106     ///
107     /// ylong_runtime::block_on(async {
108     ///     let lock = RwLock::new(0);
109     ///     let r1 = lock.read().await;
110     ///     assert_eq!(*r1, 0);
111     /// });
112     /// ```
read(&self) -> RwLockReadGuard<'_, T>113     pub async fn read(&self) -> RwLockReadGuard<'_, T> {
114         if self.read_count.fetch_add(1, Release) < 0 {
115             // The result of `acquire()` will be `Err()` only when the semaphore is closed.
116             // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`.
117             self.read_sem.acquire().await.unwrap();
118         }
119         RwLockReadGuard(self)
120     }
121 
122     /// Attempts to get the read lock. If another writer is holding the write
123     /// lock, then None will be returned. Otherwise, the ReadMutexGuard will
124     /// be returned.
125     ///
126     /// # Examples
127     ///
128     /// ```
129     /// use ylong_runtime::sync::rwlock::RwLock;
130     ///
131     /// let lock = RwLock::new(0);
132     /// let r1 = lock.try_read().unwrap();
133     /// assert_eq!(*r1, 0);
134     /// ```
try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError>135     pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError> {
136         let mut read_count = self.read_count.load(Acquire);
137         loop {
138             if read_count < 0 {
139                 return Err(LockError);
140             } else {
141                 match self.read_count.compare_exchange_weak(
142                     read_count,
143                     read_count + 1,
144                     AcqRel,
145                     Acquire,
146                 ) {
147                     Ok(_) => {
148                         return Ok(RwLockReadGuard(self));
149                     }
150                     Err(curr) => {
151                         read_count = curr;
152                     }
153                 }
154             }
155         }
156     }
157 
158     /// Asynchronously acquires the write lock.
159     ///
160     /// If there is other readers or writers, then this method will wait
161     /// asynchronously for them to get released.
162     ///
163     /// # Examples
164     ///
165     /// ```
166     /// use ylong_runtime::sync::rwlock::RwLock;
167     ///
168     /// ylong_runtime::block_on(async {
169     ///     let lock = RwLock::new(0);
170     ///     let mut r1 = lock.write().await;
171     ///     *r1 += 1;
172     ///     assert_eq!(*r1, 1);
173     /// });
174     /// ```
write(&self) -> RwLockWriteGuard<'_, T>175     pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
176         // The result of `acquire()` will be `Err()` only when the semaphore is closed.
177         // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`.
178         self.write_mutex.acquire().await.unwrap();
179         let read_count = self.read_count.fetch_sub(MAX_READS, Release);
180         // If the `read_count` is not 0, it indicates that there is currently a reader
181         // holding a read lock. If the `read_wait` is 0 after addition, it
182         // indicates that all readers have been dropped.
183         if read_count >= 0 && self.read_wait.fetch_add(read_count, Release) != -read_count {
184             self.write_sem.acquire().await.unwrap();
185         }
186         RwLockWriteGuard(self)
187     }
188 
189     /// Attempts to acquire the write lock.
190     ///
191     /// If any other task holds the read/write lock, None will be returned.
192     ///
193     /// # Examples
194     ///
195     /// ```
196     /// use ylong_runtime::sync::rwlock::RwLock;
197     ///
198     /// let lock = RwLock::new(0);
199     /// let mut r1 = lock.try_write().unwrap();
200     /// *r1 += 1;
201     /// assert_eq!(*r1, 1);
202     /// ```
try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError>203     pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError> {
204         if self.write_mutex.try_acquire().is_err() {
205             return Err(LockError);
206         }
207         match self
208             .read_count
209             .compare_exchange(0, -MAX_READS, AcqRel, Acquire)
210         {
211             Ok(_) => Ok(RwLockWriteGuard(self)),
212             Err(_) => {
213                 self.write_mutex.release();
214                 Err(LockError)
215             }
216         }
217     }
218 
219     /// Consumes the lock, and returns the data protected by it.
220     ///
221     /// # Examples
222     ///
223     /// ```
224     /// use ylong_runtime::sync::rwlock::RwLock;
225     ///
226     /// let lock = RwLock::new(0);
227     /// assert_eq!(lock.into_inner(), 0);
228     /// ```
into_inner(self) -> T where T: Sized,229     pub fn into_inner(self) -> T
230     where
231         T: Sized,
232     {
233         self.data.into_inner()
234     }
235 
236     /// Gets the mutable reference of the data protected by the lock.
237     ///
238     /// This method takes the mutable reference of the RwLock, so there is no
239     /// need to actually lock the RwLock -- the mutable borrow statically
240     /// guarantees no locks exist.
241     /// ```
242     /// use ylong_runtime::sync::rwlock::RwLock;
243     ///
244     /// ylong_runtime::block_on(async {
245     ///     let mut lock = RwLock::new(0);
246     ///     *lock.get_mut() = 10;
247     ///     assert_eq!(*lock.write().await, 10);
248     /// });
249     /// ```
get_mut(&mut self) -> &mut T250     pub fn get_mut(&mut self) -> &mut T {
251         unsafe { &mut *self.data.get() }
252     }
253 }
254 
255 /// Read guard to access the data after holding the mutex.
256 pub struct RwLockReadGuard<'a, T: ?Sized>(&'a RwLock<T>);
257 
258 unsafe impl<T: ?Sized + Send> Send for RwLockReadGuard<'_, T> {}
259 unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {}
260 
261 /// Releases the read lock. Wakes any waiting writer if it's the last one
262 /// holding the read lock.
263 impl<T: ?Sized> RwLockReadGuard<'_, T> {
unlock(&mut self)264     fn unlock(&mut self) {
265         if self.0.read_count.fetch_sub(1, Release) < 0
266             && self.0.read_wait.fetch_sub(1, Release) == 1
267         {
268             self.0.write_sem.release();
269         }
270     }
271 }
272 
273 /// Unlock the read lock when ReadGuard is dropped.
274 impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
drop(&mut self)275     fn drop(&mut self) {
276         self.unlock();
277     }
278 }
279 
280 impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
281     type Target = T;
282 
deref(&self) -> &T283     fn deref(&self) -> &T {
284         unsafe { &*self.0.data.get() }
285     }
286 }
287 
288 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result289     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290         fmt::Debug::fmt(&**self, f)
291     }
292 }
293 
294 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result295     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296         fmt::Display::fmt(&**self, f)
297     }
298 }
299 
300 /// RwLock write guard
301 pub struct RwLockWriteGuard<'a, T: ?Sized>(&'a RwLock<T>);
302 
303 unsafe impl<T: ?Sized + Send> Send for RwLockWriteGuard<'_, T> {}
304 unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {}
305 
306 /// Wakes all waiting readers first and releases the write lock when WriteGuard
307 /// is dropped.
308 impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
drop(&mut self)309     fn drop(&mut self) {
310         let read_count = self.0.read_count.fetch_add(MAX_READS, Release) + MAX_READS;
311         self.0.read_sem.release_multi(read_count as usize);
312         self.0.write_mutex.release();
313     }
314 }
315 
316 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result317     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318         fmt::Debug::fmt(&**self, f)
319     }
320 }
321 
322 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result323     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324         fmt::Display::fmt(&**self, f)
325     }
326 }
327 
328 impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
329     type Target = T;
deref(&self) -> &Self::Target330     fn deref(&self) -> &Self::Target {
331         unsafe { &*self.0.data.get() }
332     }
333 }
334 
335 impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
deref_mut(&mut self) -> &mut T336     fn deref_mut(&mut self) -> &mut T {
337         unsafe { &mut *self.0.data.get() }
338     }
339 }
340 
341 #[cfg(test)]
342 mod tests {
343     use std::sync::Arc;
344 
345     use super::*;
346     use crate::{block_on, spawn};
347 
348     /// UT test cases for Rwlock::new()
349     ///
350     /// # Brief
351     /// 1. Create a concurrent read/write lock with structure and value as input
352     ///    parameters
353     /// 2. Verify the contents of the read/write lock
354     #[test]
ut_rwlock_new_01()355     fn ut_rwlock_new_01() {
356         pub struct Test {
357             flag: bool,
358             num: usize,
359         }
360         block_on(async {
361             let lock = RwLock::new(Test { flag: true, num: 1 });
362             assert!(lock.read().await.flag);
363             assert_eq!(lock.read().await.num, 1);
364             let lock2 = RwLock::new(0);
365             assert_eq!(*lock2.read().await, 0);
366         });
367     }
368 
369     /// UT test cases for Rwlock::read()
370     ///
371     /// # Brief
372     /// 1. Creating a concurrent read/write lock
373     /// 2. Calling the read() function
374     /// 3. Verify the value of the read() function dereference
375     #[test]
ut_rwlock_read_01()376     fn ut_rwlock_read_01() {
377         block_on(async {
378             let lock = RwLock::new(100);
379             let a = lock.read().await;
380             assert_eq!(*a, 100);
381         });
382     }
383 
384     /// UT test cases for Rwlock::read()
385     ///
386     /// # Brief
387     /// 1. Creating a concurrent read/write lock
388     /// 2. Call the write() function to make changes to the concurrent
389     ///    read/write lock data
390     /// 3. Call the read() function to verify the value in the read/write lock
391     ///    of the concurrent process
392     #[test]
ut_rwlock_read_02()393     fn ut_rwlock_read_02() {
394         let lock = Arc::new(RwLock::new(100));
395         let lock2 = lock.clone();
396 
397         block_on(spawn(async move {
398             let mut loopmun = lock2.write().await;
399             *loopmun += 1;
400         }))
401         .unwrap();
402         block_on(async {
403             let a = lock.read().await;
404             assert_eq!(*a, 101);
405         });
406     }
407 
408     /// UT test cases for Rwlock::try_read()
409     ///
410     /// # Brief
411     /// 1. Creating a concurrent read/write lock
412     /// 2. Call try_read()
413     /// 3. Verify the value of the return value dereference
414     #[test]
ut_rwlock_try_read_01()415     fn ut_rwlock_try_read_01() {
416         let lock = RwLock::new(100);
417         let res = lock.try_read().unwrap();
418         assert_eq!(*res, 100);
419     }
420 
421     /// UT test cases for Rwlock::try_read()
422     ///
423     /// # Brief
424     /// 1. Creating a concurrent read/write lock
425     /// 2. Create a thread to call the write method to hold the lock, and then
426     ///    sleep to hold the lock for a long time
427     /// 3. Call try_read() to try to get a lock
428     /// 4. Check the try_read return value
429     #[test]
ut_rwlock_try_read_02()430     fn ut_rwlock_try_read_02() {
431         let lock = Arc::new(RwLock::new(100));
432         let mut a = lock.try_write().unwrap();
433         *a += 1;
434         let res = lock.try_read();
435         assert!(res.is_err());
436         *a += 1;
437         drop(a);
438         let res2 = lock.try_read();
439         assert!(res2.is_ok());
440     }
441 
442     /// UT test cases for Rwlock::write()
443     ///
444     /// # Brief
445     /// 1. Creating a concurrent read/write lock
446     /// 2. Create a call to the write interface to modify the value inside the
447     ///    concurrent read/write lock
448     /// 3. Verify the value of the concurrent read/write lock
449     #[test]
ut_rwlock_write_01()450     fn ut_rwlock_write_01() {
451         let lock = Arc::new(RwLock::new(100));
452         block_on(async {
453             let mut a = lock.write().await;
454             *a += 100;
455             assert_eq!(*a, 200);
456         });
457     }
458 
459     /// UT test cases for Rwlock::write()
460     ///
461     /// # Brief
462     /// 1. Creating a concurrent read/write lock
463     /// 2. First create a thread to obtain a write lock, modify the data in the
464     ///    concurrent read/write lock, and then hibernate to ensure that the
465     ///    lock is held for a long time
466     /// 3. Create two co-processes one to get a read lock and one to get a write
467     ///    lock, so that there is both a reader and a writer requesting the lock
468     /// 4. Verify the value inside the concurrent read/write lock when the
469     ///    concurrent read/write lock is obtained
470     #[test]
ut_rwlock_write_test_02()471     fn ut_rwlock_write_test_02() {
472         let lock = Arc::new(RwLock::new(100));
473         let lock2 = lock.clone();
474         let lock3 = lock.clone();
475         let lock4 = lock;
476 
477         let handle = spawn(async move {
478             let mut aa = lock2.write().await;
479             *aa += 100;
480         });
481         let handle1 = spawn(async move {
482             let mut aa = lock4.write().await;
483             *aa += 100;
484         });
485         block_on(handle).unwrap();
486         block_on(handle1).unwrap();
487         let handle2 = spawn(async move {
488             let aa = lock3.read().await;
489             assert_eq!(*aa, 300);
490         });
491         block_on(handle2).unwrap();
492     }
493 
494     /// UT test cases for Rwlock::try_write()
495     ///
496     /// # Brief
497     /// 1. Creating a concurrent read/write lock
498     /// 2. Call try_write() to try to get a write lock and modify the value in
499     ///    it
500     /// 3. Verify the value in the read/write lock of the concurrent process
501     #[test]
ut_rwlock_try_write_01()502     fn ut_rwlock_try_write_01() {
503         let lock = RwLock::new(100);
504         let mut aa = lock.try_write().unwrap();
505         *aa += 100;
506         assert_eq!(*aa, 200);
507     }
508 
509     /// UT test cases for Rwlock::try_write()
510     ///
511     /// # Brief
512     /// 1. Creating a concurrent read/write lock
513     /// 2. Execute command cargo test ut_rwlock_try_write_02
514     #[test]
ut_rwlock_try_write_02()515     fn ut_rwlock_try_write_02() {
516         let lock = Arc::new(RwLock::new(100));
517         let mut a = lock.try_write().unwrap();
518         *a += 1;
519         let res = lock.try_write();
520         assert!(res.is_err());
521         *a += 1;
522         drop(a);
523         let res2 = lock.try_write();
524         assert!(res2.is_ok());
525     }
526 
527     /// UT test cases for Rwlock::into_inner()
528     ///
529     /// # Brief
530     /// 1. Add a temporary library path to the project directory export
531     ///    LD_LIBRARY_PATH=$(pwd)/platform
532     /// 2. Execute command cargo test ut_rwlock_into_inner_01
533     #[test]
ut_rwlock_into_inner_01()534     fn ut_rwlock_into_inner_01() {
535         let lock = RwLock::new(10);
536         assert_eq!(lock.into_inner(), 10);
537     }
538 }
539