• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! The motivation for SharedMutex is to guard a resource without having to
2 //! extend its lifetime using an Rc<> (and potentially create reference cycles)
3 
4 use std::future::Future;
5 use std::rc::Rc;
6 use std::sync::Arc;
7 
8 use tokio::sync::{Mutex, OwnedMutexGuard, Semaphore, TryLockError};
9 
10 /// A mutex wrapping some contents of type T. When the mutex is dropped,
11 /// T will be dropped.
12 ///
13 /// The lifetime of T will be extended only if a client currently holds
14 /// exclusive access to it, in which case once that client drops its
15 /// MutexGuard, then T will be dropped.
16 pub struct SharedMutex<T> {
17     lock: Arc<Mutex<T>>,
18     on_death: Rc<Semaphore>,
19 }
20 
21 impl<T> SharedMutex<T> {
22     /// Constructor
new(t: T) -> Self23     pub fn new(t: T) -> Self {
24         Self { lock: Arc::new(Mutex::new(t)), on_death: Rc::new(Semaphore::new(0)) }
25     }
26 
27     /// Acquire exclusive access to T, or None if SharedMutex<T> is dropped
28     /// while waiting to acquire. Unlike Mutex::lock, this method produces a
29     /// future with 'static lifetime, so it can be awaited even if the
30     /// SharedMutex<> itself is dropped.
31     ///
32     /// NOTE: if the lifetime of T is extended by the holder of the lock when
33     /// the SharedMutex<> itself is dropped, all waiters will still
34     /// instantly return None (rather than waiting for the lock to be
35     /// released).
lock(&self) -> impl Future<Output = Option<OwnedMutexGuard<T>>>36     pub fn lock(&self) -> impl Future<Output = Option<OwnedMutexGuard<T>>> {
37         let mutex = self.lock.clone();
38         let on_death = self.on_death.clone();
39 
40         async move {
41             tokio::select! {
42             biased;
43               permit = on_death.acquire() => {
44                 drop(permit);
45                 None
46               },
47               acquired = mutex.lock_owned() => {
48                 Some(acquired)
49               },
50             }
51         }
52     }
53 
54     /// Synchronously acquire the lock. This similarly exhibits the
55     /// lifetime-extension behavior of Self#lock().
try_lock(&self) -> Result<OwnedMutexGuard<T>, TryLockError>56     pub fn try_lock(&self) -> Result<OwnedMutexGuard<T>, TryLockError> {
57         self.lock.clone().try_lock_owned()
58     }
59 }
60 
61 impl<T> Drop for SharedMutex<T> {
drop(&mut self)62     fn drop(&mut self) {
63         // mark dead, so all waiters instantly return
64         // no one can start to wait after drop
65         self.on_death.add_permits(Arc::strong_count(&self.lock) + 1);
66     }
67 }
68