• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2 
3 use crate::sync::batch_semaphore as semaphore;
4 
5 use std::cell::UnsafeCell;
6 use std::error::Error;
7 use std::fmt;
8 use std::ops::{Deref, DerefMut};
9 use std::sync::Arc;
10 
11 /// An asynchronous `Mutex`-like type.
12 ///
13 /// This type acts similarly to [`std::sync::Mutex`], with two major
14 /// differences: [`lock`] is an async method so does not block, and the lock
15 /// guard is designed to be held across `.await` points.
16 ///
17 /// # Which kind of mutex should you use?
18 ///
19 /// Contrary to popular belief, it is ok and often preferred to use the ordinary
20 /// [`Mutex`][std] from the standard library in asynchronous code.
21 ///
22 /// The feature that the async mutex offers over the blocking mutex is the
23 /// ability to keep it locked across an `.await` point. This makes the async
24 /// mutex more expensive than the blocking mutex, so the blocking mutex should
25 /// be preferred in the cases where it can be used. The primary use case for the
26 /// async mutex is to provide shared mutable access to IO resources such as a
27 /// database connection. If the value behind the mutex is just data, it's
28 /// usually appropriate to use a blocking mutex such as the one in the standard
29 /// library or [`parking_lot`].
30 ///
31 /// Note that, although the compiler will not prevent the std `Mutex` from holding
32 /// its guard across `.await` points in situations where the task is not movable
33 /// between threads, this virtually never leads to correct concurrent code in
34 /// practice as it can easily lead to deadlocks.
35 ///
36 /// A common pattern is to wrap the `Arc<Mutex<...>>` in a struct that provides
37 /// non-async methods for performing operations on the data within, and only
38 /// lock the mutex inside these methods. The [mini-redis] example provides an
39 /// illustration of this pattern.
40 ///
41 /// Additionally, when you _do_ want shared access to an IO resource, it is
42 /// often better to spawn a task to manage the IO resource, and to use message
43 /// passing to communicate with that task.
44 ///
45 /// [std]: std::sync::Mutex
46 /// [`parking_lot`]: https://docs.rs/parking_lot
47 /// [mini-redis]: https://github.com/tokio-rs/mini-redis/blob/master/src/db.rs
48 ///
49 /// # Examples:
50 ///
51 /// ```rust,no_run
52 /// use tokio::sync::Mutex;
53 /// use std::sync::Arc;
54 ///
55 /// #[tokio::main]
56 /// async fn main() {
57 ///     let data1 = Arc::new(Mutex::new(0));
58 ///     let data2 = Arc::clone(&data1);
59 ///
60 ///     tokio::spawn(async move {
61 ///         let mut lock = data2.lock().await;
62 ///         *lock += 1;
63 ///     });
64 ///
65 ///     let mut lock = data1.lock().await;
66 ///     *lock += 1;
67 /// }
68 /// ```
69 ///
70 ///
71 /// ```rust,no_run
72 /// use tokio::sync::Mutex;
73 /// use std::sync::Arc;
74 ///
75 /// #[tokio::main]
76 /// async fn main() {
77 ///     let count = Arc::new(Mutex::new(0));
78 ///
79 ///     for i in 0..5 {
80 ///         let my_count = Arc::clone(&count);
81 ///         tokio::spawn(async move {
82 ///             for j in 0..10 {
83 ///                 let mut lock = my_count.lock().await;
84 ///                 *lock += 1;
85 ///                 println!("{} {} {}", i, j, lock);
86 ///             }
87 ///         });
88 ///     }
89 ///
90 ///     loop {
91 ///         if *count.lock().await >= 50 {
92 ///             break;
93 ///         }
94 ///     }
95 ///     println!("Count hit 50.");
96 /// }
97 /// ```
98 /// There are a few things of note here to pay attention to in this example.
99 /// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across
100 ///    threads.
101 /// 2. Each spawned task obtains a lock and releases it on every iteration.
102 /// 3. Mutation of the data protected by the Mutex is done by de-referencing
103 ///    the obtained lock as seen on lines 12 and 19.
104 ///
105 /// Tokio's Mutex works in a simple FIFO (first in, first out) style where all
106 /// calls to [`lock`] complete in the order they were performed. In that way the
107 /// Mutex is "fair" and predictable in how it distributes the locks to inner
108 /// data. Locks are released and reacquired after every iteration, so basically,
109 /// each thread goes to the back of the line after it increments the value once.
110 /// Note that there's some unpredictability to the timing between when the
111 /// threads are started, but once they are going they alternate predictably.
112 /// Finally, since there is only a single valid lock at any given time, there is
113 /// no possibility of a race condition when mutating the inner value.
114 ///
115 /// Note that in contrast to [`std::sync::Mutex`], this implementation does not
116 /// poison the mutex when a thread holding the [`MutexGuard`] panics. In such a
117 /// case, the mutex will be unlocked. If the panic is caught, this might leave
118 /// the data protected by the mutex in an inconsistent state.
119 ///
120 /// [`Mutex`]: struct@Mutex
121 /// [`MutexGuard`]: struct@MutexGuard
122 /// [`Arc`]: struct@std::sync::Arc
123 /// [`std::sync::Mutex`]: struct@std::sync::Mutex
124 /// [`Send`]: trait@std::marker::Send
125 /// [`lock`]: method@Mutex::lock
126 pub struct Mutex<T: ?Sized> {
127     s: semaphore::Semaphore,
128     c: UnsafeCell<T>,
129 }
130 
131 /// A handle to a held `Mutex`. The guard can be held across any `.await` point
132 /// as it is [`Send`].
133 ///
134 /// As long as you have this guard, you have exclusive access to the underlying
135 /// `T`. The guard internally borrows the `Mutex`, so the mutex will not be
136 /// dropped while a guard exists.
137 ///
138 /// The lock is automatically released whenever the guard is dropped, at which
139 /// point `lock` will succeed yet again.
140 pub struct MutexGuard<'a, T: ?Sized> {
141     lock: &'a Mutex<T>,
142 }
143 
144 /// An owned handle to a held `Mutex`.
145 ///
146 /// This guard is only available from a `Mutex` that is wrapped in an [`Arc`]. It
147 /// is identical to `MutexGuard`, except that rather than borrowing the `Mutex`,
148 /// it clones the `Arc`, incrementing the reference count. This means that
149 /// unlike `MutexGuard`, it will have the `'static` lifetime.
150 ///
151 /// As long as you have this guard, you have exclusive access to the underlying
152 /// `T`. The guard internally keeps a reference-counted pointer to the original
153 /// `Mutex`, so even if the lock goes away, the guard remains valid.
154 ///
155 /// The lock is automatically released whenever the guard is dropped, at which
156 /// point `lock` will succeed yet again.
157 ///
158 /// [`Arc`]: std::sync::Arc
159 pub struct OwnedMutexGuard<T: ?Sized> {
160     lock: Arc<Mutex<T>>,
161 }
162 
163 // As long as T: Send, it's fine to send and share Mutex<T> between threads.
164 // If T was not Send, sending and sharing a Mutex<T> would be bad, since you can
165 // access T through Mutex<T>.
166 unsafe impl<T> Send for Mutex<T> where T: ?Sized + Send {}
167 unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {}
168 unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {}
169 unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {}
170 
171 /// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and
172 /// [`RwLock::try_write`] functions.
173 ///
174 /// `Mutex::try_lock` operation will only fail if the mutex is already locked.
175 ///
176 /// `RwLock::try_read` operation will only fail if the lock is currently held
177 /// by an exclusive writer.
178 ///
179 /// `RwLock::try_write` operation will if lock is held by any reader or by an
180 /// exclusive writer.
181 ///
182 /// [`Mutex::try_lock`]: Mutex::try_lock
183 /// [`RwLock::try_read`]: fn@super::RwLock::try_read
184 /// [`RwLock::try_write`]: fn@super::RwLock::try_write
185 #[derive(Debug)]
186 pub struct TryLockError(pub(super) ());
187 
188 impl fmt::Display for TryLockError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result189     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
190         write!(fmt, "operation would block")
191     }
192 }
193 
194 impl Error for TryLockError {}
195 
196 #[test]
197 #[cfg(not(loom))]
bounds()198 fn bounds() {
199     fn check_send<T: Send>() {}
200     fn check_unpin<T: Unpin>() {}
201     // This has to take a value, since the async fn's return type is unnameable.
202     fn check_send_sync_val<T: Send + Sync>(_t: T) {}
203     fn check_send_sync<T: Send + Sync>() {}
204     fn check_static<T: 'static>() {}
205     fn check_static_val<T: 'static>(_t: T) {}
206 
207     check_send::<MutexGuard<'_, u32>>();
208     check_send::<OwnedMutexGuard<u32>>();
209     check_unpin::<Mutex<u32>>();
210     check_send_sync::<Mutex<u32>>();
211     check_static::<OwnedMutexGuard<u32>>();
212 
213     let mutex = Mutex::new(1);
214     check_send_sync_val(mutex.lock());
215     let arc_mutex = Arc::new(Mutex::new(1));
216     check_send_sync_val(arc_mutex.clone().lock_owned());
217     check_static_val(arc_mutex.lock_owned());
218 }
219 
220 impl<T: ?Sized> Mutex<T> {
221     /// Creates a new lock in an unlocked state ready for use.
222     ///
223     /// # Examples
224     ///
225     /// ```
226     /// use tokio::sync::Mutex;
227     ///
228     /// let lock = Mutex::new(5);
229     /// ```
new(t: T) -> Self where T: Sized,230     pub fn new(t: T) -> Self
231     where
232         T: Sized,
233     {
234         Self {
235             c: UnsafeCell::new(t),
236             s: semaphore::Semaphore::new(1),
237         }
238     }
239 
240     /// Creates a new lock in an unlocked state ready for use.
241     ///
242     /// # Examples
243     ///
244     /// ```
245     /// use tokio::sync::Mutex;
246     ///
247     /// static LOCK: Mutex<i32> = Mutex::const_new(5);
248     /// ```
249     #[cfg(all(feature = "parking_lot", not(all(loom, test)),))]
250     #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
const_new(t: T) -> Self where T: Sized,251     pub const fn const_new(t: T) -> Self
252     where
253         T: Sized,
254     {
255         Self {
256             c: UnsafeCell::new(t),
257             s: semaphore::Semaphore::const_new(1),
258         }
259     }
260 
261     /// Locks this mutex, causing the current task
262     /// to yield until the lock has been acquired.
263     /// When the lock has been acquired, function returns a [`MutexGuard`].
264     ///
265     /// # Examples
266     ///
267     /// ```
268     /// use tokio::sync::Mutex;
269     ///
270     /// #[tokio::main]
271     /// async fn main() {
272     ///     let mutex = Mutex::new(1);
273     ///
274     ///     let mut n = mutex.lock().await;
275     ///     *n = 2;
276     /// }
277     /// ```
lock(&self) -> MutexGuard<'_, T>278     pub async fn lock(&self) -> MutexGuard<'_, T> {
279         self.acquire().await;
280         MutexGuard { lock: self }
281     }
282 
283     /// Locks this mutex, causing the current task to yield until the lock has
284     /// been acquired. When the lock has been acquired, this returns an
285     /// [`OwnedMutexGuard`].
286     ///
287     /// This method is identical to [`Mutex::lock`], except that the returned
288     /// guard references the `Mutex` with an [`Arc`] rather than by borrowing
289     /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this
290     /// method, and the guard will live for the `'static` lifetime, as it keeps
291     /// the `Mutex` alive by holding an `Arc`.
292     ///
293     /// # Examples
294     ///
295     /// ```
296     /// use tokio::sync::Mutex;
297     /// use std::sync::Arc;
298     ///
299     /// #[tokio::main]
300     /// async fn main() {
301     ///     let mutex = Arc::new(Mutex::new(1));
302     ///
303     ///     let mut n = mutex.clone().lock_owned().await;
304     ///     *n = 2;
305     /// }
306     /// ```
307     ///
308     /// [`Arc`]: std::sync::Arc
lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T>309     pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
310         self.acquire().await;
311         OwnedMutexGuard { lock: self }
312     }
313 
acquire(&self)314     async fn acquire(&self) {
315         self.s.acquire(1).await.unwrap_or_else(|_| {
316             // The semaphore was closed. but, we never explicitly close it, and
317             // we own it exclusively, which means that this can never happen.
318             unreachable!()
319         });
320     }
321 
322     /// Attempts to acquire the lock, and returns [`TryLockError`] if the
323     /// lock is currently held somewhere else.
324     ///
325     /// [`TryLockError`]: TryLockError
326     /// # Examples
327     ///
328     /// ```
329     /// use tokio::sync::Mutex;
330     /// # async fn dox() -> Result<(), tokio::sync::TryLockError> {
331     ///
332     /// let mutex = Mutex::new(1);
333     ///
334     /// let n = mutex.try_lock()?;
335     /// assert_eq!(*n, 1);
336     /// # Ok(())
337     /// # }
338     /// ```
try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError>339     pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
340         match self.s.try_acquire(1) {
341             Ok(_) => Ok(MutexGuard { lock: self }),
342             Err(_) => Err(TryLockError(())),
343         }
344     }
345 
346     /// Returns a mutable reference to the underlying data.
347     ///
348     /// Since this call borrows the `Mutex` mutably, no actual locking needs to
349     /// take place -- the mutable borrow statically guarantees no locks exist.
350     ///
351     /// # Examples
352     ///
353     /// ```
354     /// use tokio::sync::Mutex;
355     ///
356     /// fn main() {
357     ///     let mut mutex = Mutex::new(1);
358     ///
359     ///     let n = mutex.get_mut();
360     ///     *n = 2;
361     /// }
362     /// ```
get_mut(&mut self) -> &mut T363     pub fn get_mut(&mut self) -> &mut T {
364         unsafe {
365             // Safety: This is https://github.com/rust-lang/rust/pull/76936
366             &mut *self.c.get()
367         }
368     }
369 
370     /// Attempts to acquire the lock, and returns [`TryLockError`] if the lock
371     /// is currently held somewhere else.
372     ///
373     /// This method is identical to [`Mutex::try_lock`], except that the
374     /// returned  guard references the `Mutex` with an [`Arc`] rather than by
375     /// borrowing it. Therefore, the `Mutex` must be wrapped in an `Arc` to call
376     /// this method, and the guard will live for the `'static` lifetime, as it
377     /// keeps the `Mutex` alive by holding an `Arc`.
378     ///
379     /// [`TryLockError`]: TryLockError
380     /// [`Arc`]: std::sync::Arc
381     /// # Examples
382     ///
383     /// ```
384     /// use tokio::sync::Mutex;
385     /// use std::sync::Arc;
386     /// # async fn dox() -> Result<(), tokio::sync::TryLockError> {
387     ///
388     /// let mutex = Arc::new(Mutex::new(1));
389     ///
390     /// let n = mutex.clone().try_lock_owned()?;
391     /// assert_eq!(*n, 1);
392     /// # Ok(())
393     /// # }
try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError>394     pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> {
395         match self.s.try_acquire(1) {
396             Ok(_) => Ok(OwnedMutexGuard { lock: self }),
397             Err(_) => Err(TryLockError(())),
398         }
399     }
400 
401     /// Consumes the mutex, returning the underlying data.
402     /// # Examples
403     ///
404     /// ```
405     /// use tokio::sync::Mutex;
406     ///
407     /// #[tokio::main]
408     /// async fn main() {
409     ///     let mutex = Mutex::new(1);
410     ///
411     ///     let n = mutex.into_inner();
412     ///     assert_eq!(n, 1);
413     /// }
414     /// ```
into_inner(self) -> T where T: Sized,415     pub fn into_inner(self) -> T
416     where
417         T: Sized,
418     {
419         self.c.into_inner()
420     }
421 }
422 
423 impl<T> From<T> for Mutex<T> {
from(s: T) -> Self424     fn from(s: T) -> Self {
425         Self::new(s)
426     }
427 }
428 
429 impl<T> Default for Mutex<T>
430 where
431     T: Default,
432 {
default() -> Self433     fn default() -> Self {
434         Self::new(T::default())
435     }
436 }
437 
438 impl<T> std::fmt::Debug for Mutex<T>
439 where
440     T: std::fmt::Debug,
441 {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result442     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443         let mut d = f.debug_struct("Mutex");
444         match self.try_lock() {
445             Ok(inner) => d.field("data", &*inner),
446             Err(_) => d.field("data", &format_args!("<locked>")),
447         };
448         d.finish()
449     }
450 }
451 
452 // === impl MutexGuard ===
453 
454 impl<T: ?Sized> Drop for MutexGuard<'_, T> {
drop(&mut self)455     fn drop(&mut self) {
456         self.lock.s.release(1)
457     }
458 }
459 
460 impl<T: ?Sized> Deref for MutexGuard<'_, T> {
461     type Target = T;
deref(&self) -> &Self::Target462     fn deref(&self) -> &Self::Target {
463         unsafe { &*self.lock.c.get() }
464     }
465 }
466 
467 impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
deref_mut(&mut self) -> &mut Self::Target468     fn deref_mut(&mut self) -> &mut Self::Target {
469         unsafe { &mut *self.lock.c.get() }
470     }
471 }
472 
473 impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result474     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475         fmt::Debug::fmt(&**self, f)
476     }
477 }
478 
479 impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result480     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
481         fmt::Display::fmt(&**self, f)
482     }
483 }
484 
485 // === impl OwnedMutexGuard ===
486 
487 impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
drop(&mut self)488     fn drop(&mut self) {
489         self.lock.s.release(1)
490     }
491 }
492 
493 impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
494     type Target = T;
deref(&self) -> &Self::Target495     fn deref(&self) -> &Self::Target {
496         unsafe { &*self.lock.c.get() }
497     }
498 }
499 
500 impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
deref_mut(&mut self) -> &mut Self::Target501     fn deref_mut(&mut self) -> &mut Self::Target {
502         unsafe { &mut *self.lock.c.get() }
503     }
504 }
505 
506 impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result507     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508         fmt::Debug::fmt(&**self, f)
509     }
510 }
511 
512 impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result513     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514         fmt::Display::fmt(&**self, f)
515     }
516 }
517