1 //! The motivation for SharedMutex is to guard a resource without having to 2 //! extend its lifetime using an Rc<> (and potentially create reference cycles) 3 4 use std::future::Future; 5 use std::rc::Rc; 6 use std::sync::Arc; 7 8 use tokio::sync::{Mutex, OwnedMutexGuard, Semaphore, TryLockError}; 9 10 /// A mutex wrapping some contents of type T. When the mutex is dropped, 11 /// T will be dropped. 12 /// 13 /// The lifetime of T will be extended only if a client currently holds 14 /// exclusive access to it, in which case once that client drops its 15 /// MutexGuard, then T will be dropped. 16 pub struct SharedMutex<T> { 17 lock: Arc<Mutex<T>>, 18 on_death: Rc<Semaphore>, 19 } 20 21 impl<T> SharedMutex<T> { 22 /// Constructor new(t: T) -> Self23 pub fn new(t: T) -> Self { 24 Self { lock: Arc::new(Mutex::new(t)), on_death: Rc::new(Semaphore::new(0)) } 25 } 26 27 /// Acquire exclusive access to T, or None if SharedMutex<T> is dropped 28 /// while waiting to acquire. Unlike Mutex::lock, this method produces a 29 /// future with 'static lifetime, so it can be awaited even if the 30 /// SharedMutex<> itself is dropped. 31 /// 32 /// NOTE: if the lifetime of T is extended by the holder of the lock when 33 /// the SharedMutex<> itself is dropped, all waiters will still 34 /// instantly return None (rather than waiting for the lock to be 35 /// released). lock(&self) -> impl Future<Output = Option<OwnedMutexGuard<T>>>36 pub fn lock(&self) -> impl Future<Output = Option<OwnedMutexGuard<T>>> { 37 let mutex = self.lock.clone(); 38 let on_death = self.on_death.clone(); 39 40 async move { 41 tokio::select! { 42 biased; 43 permit = on_death.acquire() => { 44 drop(permit); 45 None 46 }, 47 acquired = mutex.lock_owned() => { 48 Some(acquired) 49 }, 50 } 51 } 52 } 53 54 /// Synchronously acquire the lock. This similarly exhibits the 55 /// lifetime-extension behavior of Self#lock(). try_lock(&self) -> Result<OwnedMutexGuard<T>, TryLockError>56 pub fn try_lock(&self) -> Result<OwnedMutexGuard<T>, TryLockError> { 57 self.lock.clone().try_lock_owned() 58 } 59 } 60 61 impl<T> Drop for SharedMutex<T> { drop(&mut self)62 fn drop(&mut self) { 63 // mark dead, so all waiters instantly return 64 // no one can start to wait after drop 65 self.on_death.add_permits(Arc::strong_count(&self.lock) + 1); 66 } 67 } 68