1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2
3 use crate::sync::batch_semaphore as semaphore;
4
5 use std::cell::UnsafeCell;
6 use std::error::Error;
7 use std::fmt;
8 use std::ops::{Deref, DerefMut};
9 use std::sync::Arc;
10
11 /// An asynchronous `Mutex`-like type.
12 ///
13 /// This type acts similarly to [`std::sync::Mutex`], with two major
14 /// differences: [`lock`] is an async method so does not block, and the lock
15 /// guard is designed to be held across `.await` points.
16 ///
17 /// # Which kind of mutex should you use?
18 ///
19 /// Contrary to popular belief, it is ok and often preferred to use the ordinary
20 /// [`Mutex`][std] from the standard library in asynchronous code.
21 ///
22 /// The feature that the async mutex offers over the blocking mutex is the
23 /// ability to keep it locked across an `.await` point. This makes the async
24 /// mutex more expensive than the blocking mutex, so the blocking mutex should
25 /// be preferred in the cases where it can be used. The primary use case for the
26 /// async mutex is to provide shared mutable access to IO resources such as a
27 /// database connection. If the value behind the mutex is just data, it's
28 /// usually appropriate to use a blocking mutex such as the one in the standard
29 /// library or [`parking_lot`].
30 ///
31 /// Note that, although the compiler will not prevent the std `Mutex` from holding
32 /// its guard across `.await` points in situations where the task is not movable
33 /// between threads, this virtually never leads to correct concurrent code in
34 /// practice as it can easily lead to deadlocks.
35 ///
36 /// A common pattern is to wrap the `Arc<Mutex<...>>` in a struct that provides
37 /// non-async methods for performing operations on the data within, and only
38 /// lock the mutex inside these methods. The [mini-redis] example provides an
39 /// illustration of this pattern.
40 ///
41 /// Additionally, when you _do_ want shared access to an IO resource, it is
42 /// often better to spawn a task to manage the IO resource, and to use message
43 /// passing to communicate with that task.
44 ///
45 /// [std]: std::sync::Mutex
46 /// [`parking_lot`]: https://docs.rs/parking_lot
47 /// [mini-redis]: https://github.com/tokio-rs/mini-redis/blob/master/src/db.rs
48 ///
49 /// # Examples:
50 ///
51 /// ```rust,no_run
52 /// use tokio::sync::Mutex;
53 /// use std::sync::Arc;
54 ///
55 /// #[tokio::main]
56 /// async fn main() {
57 /// let data1 = Arc::new(Mutex::new(0));
58 /// let data2 = Arc::clone(&data1);
59 ///
60 /// tokio::spawn(async move {
61 /// let mut lock = data2.lock().await;
62 /// *lock += 1;
63 /// });
64 ///
65 /// let mut lock = data1.lock().await;
66 /// *lock += 1;
67 /// }
68 /// ```
69 ///
70 ///
71 /// ```rust,no_run
72 /// use tokio::sync::Mutex;
73 /// use std::sync::Arc;
74 ///
75 /// #[tokio::main]
76 /// async fn main() {
77 /// let count = Arc::new(Mutex::new(0));
78 ///
79 /// for i in 0..5 {
80 /// let my_count = Arc::clone(&count);
81 /// tokio::spawn(async move {
82 /// for j in 0..10 {
83 /// let mut lock = my_count.lock().await;
84 /// *lock += 1;
85 /// println!("{} {} {}", i, j, lock);
86 /// }
87 /// });
88 /// }
89 ///
90 /// loop {
91 /// if *count.lock().await >= 50 {
92 /// break;
93 /// }
94 /// }
95 /// println!("Count hit 50.");
96 /// }
97 /// ```
98 /// There are a few things of note here to pay attention to in this example.
99 /// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across
100 /// threads.
101 /// 2. Each spawned task obtains a lock and releases it on every iteration.
102 /// 3. Mutation of the data protected by the Mutex is done by de-referencing
103 /// the obtained lock as seen on lines 12 and 19.
104 ///
105 /// Tokio's Mutex works in a simple FIFO (first in, first out) style where all
106 /// calls to [`lock`] complete in the order they were performed. In that way the
107 /// Mutex is "fair" and predictable in how it distributes the locks to inner
108 /// data. Locks are released and reacquired after every iteration, so basically,
109 /// each thread goes to the back of the line after it increments the value once.
110 /// Note that there's some unpredictability to the timing between when the
111 /// threads are started, but once they are going they alternate predictably.
112 /// Finally, since there is only a single valid lock at any given time, there is
113 /// no possibility of a race condition when mutating the inner value.
114 ///
115 /// Note that in contrast to [`std::sync::Mutex`], this implementation does not
116 /// poison the mutex when a thread holding the [`MutexGuard`] panics. In such a
117 /// case, the mutex will be unlocked. If the panic is caught, this might leave
118 /// the data protected by the mutex in an inconsistent state.
119 ///
120 /// [`Mutex`]: struct@Mutex
121 /// [`MutexGuard`]: struct@MutexGuard
122 /// [`Arc`]: struct@std::sync::Arc
123 /// [`std::sync::Mutex`]: struct@std::sync::Mutex
124 /// [`Send`]: trait@std::marker::Send
125 /// [`lock`]: method@Mutex::lock
126 pub struct Mutex<T: ?Sized> {
127 s: semaphore::Semaphore,
128 c: UnsafeCell<T>,
129 }
130
131 /// A handle to a held `Mutex`. The guard can be held across any `.await` point
132 /// as it is [`Send`].
133 ///
134 /// As long as you have this guard, you have exclusive access to the underlying
135 /// `T`. The guard internally borrows the `Mutex`, so the mutex will not be
136 /// dropped while a guard exists.
137 ///
138 /// The lock is automatically released whenever the guard is dropped, at which
139 /// point `lock` will succeed yet again.
140 pub struct MutexGuard<'a, T: ?Sized> {
141 lock: &'a Mutex<T>,
142 }
143
144 /// An owned handle to a held `Mutex`.
145 ///
146 /// This guard is only available from a `Mutex` that is wrapped in an [`Arc`]. It
147 /// is identical to `MutexGuard`, except that rather than borrowing the `Mutex`,
148 /// it clones the `Arc`, incrementing the reference count. This means that
149 /// unlike `MutexGuard`, it will have the `'static` lifetime.
150 ///
151 /// As long as you have this guard, you have exclusive access to the underlying
152 /// `T`. The guard internally keeps a reference-counted pointer to the original
153 /// `Mutex`, so even if the lock goes away, the guard remains valid.
154 ///
155 /// The lock is automatically released whenever the guard is dropped, at which
156 /// point `lock` will succeed yet again.
157 ///
158 /// [`Arc`]: std::sync::Arc
159 pub struct OwnedMutexGuard<T: ?Sized> {
160 lock: Arc<Mutex<T>>,
161 }
162
163 // As long as T: Send, it's fine to send and share Mutex<T> between threads.
164 // If T was not Send, sending and sharing a Mutex<T> would be bad, since you can
165 // access T through Mutex<T>.
166 unsafe impl<T> Send for Mutex<T> where T: ?Sized + Send {}
167 unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {}
168 unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {}
169 unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {}
170
171 /// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and
172 /// [`RwLock::try_write`] functions.
173 ///
174 /// `Mutex::try_lock` operation will only fail if the mutex is already locked.
175 ///
176 /// `RwLock::try_read` operation will only fail if the lock is currently held
177 /// by an exclusive writer.
178 ///
179 /// `RwLock::try_write` operation will if lock is held by any reader or by an
180 /// exclusive writer.
181 ///
182 /// [`Mutex::try_lock`]: Mutex::try_lock
183 /// [`RwLock::try_read`]: fn@super::RwLock::try_read
184 /// [`RwLock::try_write`]: fn@super::RwLock::try_write
185 #[derive(Debug)]
186 pub struct TryLockError(pub(super) ());
187
188 impl fmt::Display for TryLockError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result189 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
190 write!(fmt, "operation would block")
191 }
192 }
193
194 impl Error for TryLockError {}
195
196 #[test]
197 #[cfg(not(loom))]
bounds()198 fn bounds() {
199 fn check_send<T: Send>() {}
200 fn check_unpin<T: Unpin>() {}
201 // This has to take a value, since the async fn's return type is unnameable.
202 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
203 fn check_send_sync<T: Send + Sync>() {}
204 fn check_static<T: 'static>() {}
205 fn check_static_val<T: 'static>(_t: T) {}
206
207 check_send::<MutexGuard<'_, u32>>();
208 check_send::<OwnedMutexGuard<u32>>();
209 check_unpin::<Mutex<u32>>();
210 check_send_sync::<Mutex<u32>>();
211 check_static::<OwnedMutexGuard<u32>>();
212
213 let mutex = Mutex::new(1);
214 check_send_sync_val(mutex.lock());
215 let arc_mutex = Arc::new(Mutex::new(1));
216 check_send_sync_val(arc_mutex.clone().lock_owned());
217 check_static_val(arc_mutex.lock_owned());
218 }
219
220 impl<T: ?Sized> Mutex<T> {
221 /// Creates a new lock in an unlocked state ready for use.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// use tokio::sync::Mutex;
227 ///
228 /// let lock = Mutex::new(5);
229 /// ```
new(t: T) -> Self where T: Sized,230 pub fn new(t: T) -> Self
231 where
232 T: Sized,
233 {
234 Self {
235 c: UnsafeCell::new(t),
236 s: semaphore::Semaphore::new(1),
237 }
238 }
239
240 /// Creates a new lock in an unlocked state ready for use.
241 ///
242 /// # Examples
243 ///
244 /// ```
245 /// use tokio::sync::Mutex;
246 ///
247 /// static LOCK: Mutex<i32> = Mutex::const_new(5);
248 /// ```
249 #[cfg(all(feature = "parking_lot", not(all(loom, test)),))]
250 #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
const_new(t: T) -> Self where T: Sized,251 pub const fn const_new(t: T) -> Self
252 where
253 T: Sized,
254 {
255 Self {
256 c: UnsafeCell::new(t),
257 s: semaphore::Semaphore::const_new(1),
258 }
259 }
260
261 /// Locks this mutex, causing the current task
262 /// to yield until the lock has been acquired.
263 /// When the lock has been acquired, function returns a [`MutexGuard`].
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use tokio::sync::Mutex;
269 ///
270 /// #[tokio::main]
271 /// async fn main() {
272 /// let mutex = Mutex::new(1);
273 ///
274 /// let mut n = mutex.lock().await;
275 /// *n = 2;
276 /// }
277 /// ```
lock(&self) -> MutexGuard<'_, T>278 pub async fn lock(&self) -> MutexGuard<'_, T> {
279 self.acquire().await;
280 MutexGuard { lock: self }
281 }
282
283 /// Locks this mutex, causing the current task to yield until the lock has
284 /// been acquired. When the lock has been acquired, this returns an
285 /// [`OwnedMutexGuard`].
286 ///
287 /// This method is identical to [`Mutex::lock`], except that the returned
288 /// guard references the `Mutex` with an [`Arc`] rather than by borrowing
289 /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this
290 /// method, and the guard will live for the `'static` lifetime, as it keeps
291 /// the `Mutex` alive by holding an `Arc`.
292 ///
293 /// # Examples
294 ///
295 /// ```
296 /// use tokio::sync::Mutex;
297 /// use std::sync::Arc;
298 ///
299 /// #[tokio::main]
300 /// async fn main() {
301 /// let mutex = Arc::new(Mutex::new(1));
302 ///
303 /// let mut n = mutex.clone().lock_owned().await;
304 /// *n = 2;
305 /// }
306 /// ```
307 ///
308 /// [`Arc`]: std::sync::Arc
lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T>309 pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
310 self.acquire().await;
311 OwnedMutexGuard { lock: self }
312 }
313
acquire(&self)314 async fn acquire(&self) {
315 self.s.acquire(1).await.unwrap_or_else(|_| {
316 // The semaphore was closed. but, we never explicitly close it, and
317 // we own it exclusively, which means that this can never happen.
318 unreachable!()
319 });
320 }
321
322 /// Attempts to acquire the lock, and returns [`TryLockError`] if the
323 /// lock is currently held somewhere else.
324 ///
325 /// [`TryLockError`]: TryLockError
326 /// # Examples
327 ///
328 /// ```
329 /// use tokio::sync::Mutex;
330 /// # async fn dox() -> Result<(), tokio::sync::TryLockError> {
331 ///
332 /// let mutex = Mutex::new(1);
333 ///
334 /// let n = mutex.try_lock()?;
335 /// assert_eq!(*n, 1);
336 /// # Ok(())
337 /// # }
338 /// ```
try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError>339 pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
340 match self.s.try_acquire(1) {
341 Ok(_) => Ok(MutexGuard { lock: self }),
342 Err(_) => Err(TryLockError(())),
343 }
344 }
345
346 /// Returns a mutable reference to the underlying data.
347 ///
348 /// Since this call borrows the `Mutex` mutably, no actual locking needs to
349 /// take place -- the mutable borrow statically guarantees no locks exist.
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// use tokio::sync::Mutex;
355 ///
356 /// fn main() {
357 /// let mut mutex = Mutex::new(1);
358 ///
359 /// let n = mutex.get_mut();
360 /// *n = 2;
361 /// }
362 /// ```
get_mut(&mut self) -> &mut T363 pub fn get_mut(&mut self) -> &mut T {
364 unsafe {
365 // Safety: This is https://github.com/rust-lang/rust/pull/76936
366 &mut *self.c.get()
367 }
368 }
369
370 /// Attempts to acquire the lock, and returns [`TryLockError`] if the lock
371 /// is currently held somewhere else.
372 ///
373 /// This method is identical to [`Mutex::try_lock`], except that the
374 /// returned guard references the `Mutex` with an [`Arc`] rather than by
375 /// borrowing it. Therefore, the `Mutex` must be wrapped in an `Arc` to call
376 /// this method, and the guard will live for the `'static` lifetime, as it
377 /// keeps the `Mutex` alive by holding an `Arc`.
378 ///
379 /// [`TryLockError`]: TryLockError
380 /// [`Arc`]: std::sync::Arc
381 /// # Examples
382 ///
383 /// ```
384 /// use tokio::sync::Mutex;
385 /// use std::sync::Arc;
386 /// # async fn dox() -> Result<(), tokio::sync::TryLockError> {
387 ///
388 /// let mutex = Arc::new(Mutex::new(1));
389 ///
390 /// let n = mutex.clone().try_lock_owned()?;
391 /// assert_eq!(*n, 1);
392 /// # Ok(())
393 /// # }
try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError>394 pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> {
395 match self.s.try_acquire(1) {
396 Ok(_) => Ok(OwnedMutexGuard { lock: self }),
397 Err(_) => Err(TryLockError(())),
398 }
399 }
400
401 /// Consumes the mutex, returning the underlying data.
402 /// # Examples
403 ///
404 /// ```
405 /// use tokio::sync::Mutex;
406 ///
407 /// #[tokio::main]
408 /// async fn main() {
409 /// let mutex = Mutex::new(1);
410 ///
411 /// let n = mutex.into_inner();
412 /// assert_eq!(n, 1);
413 /// }
414 /// ```
into_inner(self) -> T where T: Sized,415 pub fn into_inner(self) -> T
416 where
417 T: Sized,
418 {
419 self.c.into_inner()
420 }
421 }
422
423 impl<T> From<T> for Mutex<T> {
from(s: T) -> Self424 fn from(s: T) -> Self {
425 Self::new(s)
426 }
427 }
428
429 impl<T> Default for Mutex<T>
430 where
431 T: Default,
432 {
default() -> Self433 fn default() -> Self {
434 Self::new(T::default())
435 }
436 }
437
438 impl<T> std::fmt::Debug for Mutex<T>
439 where
440 T: std::fmt::Debug,
441 {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result442 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443 let mut d = f.debug_struct("Mutex");
444 match self.try_lock() {
445 Ok(inner) => d.field("data", &*inner),
446 Err(_) => d.field("data", &format_args!("<locked>")),
447 };
448 d.finish()
449 }
450 }
451
452 // === impl MutexGuard ===
453
454 impl<T: ?Sized> Drop for MutexGuard<'_, T> {
drop(&mut self)455 fn drop(&mut self) {
456 self.lock.s.release(1)
457 }
458 }
459
460 impl<T: ?Sized> Deref for MutexGuard<'_, T> {
461 type Target = T;
deref(&self) -> &Self::Target462 fn deref(&self) -> &Self::Target {
463 unsafe { &*self.lock.c.get() }
464 }
465 }
466
467 impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
deref_mut(&mut self) -> &mut Self::Target468 fn deref_mut(&mut self) -> &mut Self::Target {
469 unsafe { &mut *self.lock.c.get() }
470 }
471 }
472
473 impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 fmt::Debug::fmt(&**self, f)
476 }
477 }
478
479 impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result480 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
481 fmt::Display::fmt(&**self, f)
482 }
483 }
484
485 // === impl OwnedMutexGuard ===
486
487 impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
drop(&mut self)488 fn drop(&mut self) {
489 self.lock.s.release(1)
490 }
491 }
492
493 impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
494 type Target = T;
deref(&self) -> &Self::Target495 fn deref(&self) -> &Self::Target {
496 unsafe { &*self.lock.c.get() }
497 }
498 }
499
500 impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
deref_mut(&mut self) -> &mut Self::Target501 fn deref_mut(&mut self) -> &mut Self::Target {
502 unsafe { &mut *self.lock.c.get() }
503 }
504 }
505
506 impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result507 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508 fmt::Debug::fmt(&**self, f)
509 }
510 }
511
512 impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result513 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514 fmt::Display::fmt(&**self, f)
515 }
516 }
517