1 use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
2 use crate::sync::mutex::TryLockError;
3 #[cfg(all(tokio_unstable, feature = "tracing"))]
4 use crate::util::trace;
5 use std::cell::UnsafeCell;
6 use std::marker;
7 use std::marker::PhantomData;
8 use std::sync::Arc;
9
10 pub(crate) mod owned_read_guard;
11 pub(crate) mod owned_write_guard;
12 pub(crate) mod owned_write_guard_mapped;
13 pub(crate) mod read_guard;
14 pub(crate) mod write_guard;
15 pub(crate) mod write_guard_mapped;
16 pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
17 pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
18 pub(crate) use owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
19 pub(crate) use read_guard::RwLockReadGuard;
20 pub(crate) use write_guard::RwLockWriteGuard;
21 pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
22
23 #[cfg(not(loom))]
24 const MAX_READS: u32 = std::u32::MAX >> 3;
25
26 #[cfg(loom)]
27 const MAX_READS: u32 = 10;
28
29 /// An asynchronous reader-writer lock.
30 ///
31 /// This type of lock allows a number of readers or at most one writer at any
32 /// point in time. The write portion of this lock typically allows modification
33 /// of the underlying data (exclusive access) and the read portion of this lock
34 /// typically allows for read-only access (shared access).
35 ///
36 /// In comparison, a [`Mutex`] does not distinguish between readers or writers
37 /// that acquire the lock, therefore causing any tasks waiting for the lock to
38 /// become available to yield. An `RwLock` will allow any number of readers to
39 /// acquire the lock as long as a writer is not holding the lock.
40 ///
41 /// The priority policy of Tokio's read-write lock is _fair_ (or
42 /// [_write-preferring_]), in order to ensure that readers cannot starve
43 /// writers. Fairness is ensured using a first-in, first-out queue for the tasks
44 /// awaiting the lock; if a task that wishes to acquire the write lock is at the
45 /// head of the queue, read locks will not be given out until the write lock has
46 /// been released. This is in contrast to the Rust standard library's
47 /// `std::sync::RwLock`, where the priority policy is dependent on the
48 /// operating system's implementation.
49 ///
50 /// The type parameter `T` represents the data that this lock protects. It is
51 /// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
52 /// returned from the locking methods implement [`Deref`](trait@std::ops::Deref)
53 /// (and [`DerefMut`](trait@std::ops::DerefMut)
54 /// for the `write` methods) to allow access to the content of the lock.
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// use tokio::sync::RwLock;
60 ///
61 /// #[tokio::main]
62 /// async fn main() {
63 /// let lock = RwLock::new(5);
64 ///
65 /// // many reader locks can be held at once
66 /// {
67 /// let r1 = lock.read().await;
68 /// let r2 = lock.read().await;
69 /// assert_eq!(*r1, 5);
70 /// assert_eq!(*r2, 5);
71 /// } // read locks are dropped at this point
72 ///
73 /// // only one write lock may be held, however
74 /// {
75 /// let mut w = lock.write().await;
76 /// *w += 1;
77 /// assert_eq!(*w, 6);
78 /// } // write lock is dropped here
79 /// }
80 /// ```
81 ///
82 /// [`Mutex`]: struct@super::Mutex
83 /// [`RwLock`]: struct@RwLock
84 /// [`RwLockReadGuard`]: struct@RwLockReadGuard
85 /// [`RwLockWriteGuard`]: struct@RwLockWriteGuard
86 /// [`Send`]: trait@std::marker::Send
87 /// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
88 pub struct RwLock<T: ?Sized> {
89 #[cfg(all(tokio_unstable, feature = "tracing"))]
90 resource_span: tracing::Span,
91
92 // maximum number of concurrent readers
93 mr: u32,
94
95 //semaphore to coordinate read and write access to T
96 s: Semaphore,
97
98 //inner data T
99 c: UnsafeCell<T>,
100 }
101
102 #[test]
103 #[cfg(not(loom))]
bounds()104 fn bounds() {
105 fn check_send<T: Send>() {}
106 fn check_sync<T: Sync>() {}
107 fn check_unpin<T: Unpin>() {}
108 // This has to take a value, since the async fn's return type is unnameable.
109 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
110
111 check_send::<RwLock<u32>>();
112 check_sync::<RwLock<u32>>();
113 check_unpin::<RwLock<u32>>();
114
115 check_send::<RwLockReadGuard<'_, u32>>();
116 check_sync::<RwLockReadGuard<'_, u32>>();
117 check_unpin::<RwLockReadGuard<'_, u32>>();
118
119 check_send::<OwnedRwLockReadGuard<u32, i32>>();
120 check_sync::<OwnedRwLockReadGuard<u32, i32>>();
121 check_unpin::<OwnedRwLockReadGuard<u32, i32>>();
122
123 check_send::<RwLockWriteGuard<'_, u32>>();
124 check_sync::<RwLockWriteGuard<'_, u32>>();
125 check_unpin::<RwLockWriteGuard<'_, u32>>();
126
127 check_send::<RwLockMappedWriteGuard<'_, u32>>();
128 check_sync::<RwLockMappedWriteGuard<'_, u32>>();
129 check_unpin::<RwLockMappedWriteGuard<'_, u32>>();
130
131 check_send::<OwnedRwLockWriteGuard<u32>>();
132 check_sync::<OwnedRwLockWriteGuard<u32>>();
133 check_unpin::<OwnedRwLockWriteGuard<u32>>();
134
135 check_send::<OwnedRwLockMappedWriteGuard<u32, i32>>();
136 check_sync::<OwnedRwLockMappedWriteGuard<u32, i32>>();
137 check_unpin::<OwnedRwLockMappedWriteGuard<u32, i32>>();
138
139 let rwlock = Arc::new(RwLock::new(0));
140 check_send_sync_val(rwlock.read());
141 check_send_sync_val(Arc::clone(&rwlock).read_owned());
142 check_send_sync_val(rwlock.write());
143 check_send_sync_val(Arc::clone(&rwlock).write_owned());
144 }
145
146 // As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
147 // If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
148 // RwLock<T>.
149 unsafe impl<T> Send for RwLock<T> where T: ?Sized + Send {}
150 unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
151 // NB: These impls need to be explicit since we're storing a raw pointer.
152 // Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
153 // `T` is `Send`.
154 unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
155 unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
156 // T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
157 // the RwLock, unlike RwLockReadGuard.
158 unsafe impl<T, U> Send for OwnedRwLockReadGuard<T, U>
159 where
160 T: ?Sized + Send + Sync,
161 U: ?Sized + Sync,
162 {
163 }
164 unsafe impl<T, U> Sync for OwnedRwLockReadGuard<T, U>
165 where
166 T: ?Sized + Send + Sync,
167 U: ?Sized + Send + Sync,
168 {
169 }
170 unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
171 unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
172 unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
173 unsafe impl<T, U> Sync for OwnedRwLockMappedWriteGuard<T, U>
174 where
175 T: ?Sized + Send + Sync,
176 U: ?Sized + Send + Sync,
177 {
178 }
179 // Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
180 // `T` is `Send` - but since this is also provides mutable access, we need to
181 // make sure that `T` is `Send` since its value can be sent across thread
182 // boundaries.
183 unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
184 unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
185 unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
186 unsafe impl<T, U> Send for OwnedRwLockMappedWriteGuard<T, U>
187 where
188 T: ?Sized + Send + Sync,
189 U: ?Sized + Send + Sync,
190 {
191 }
192
193 impl<T: ?Sized> RwLock<T> {
194 /// Creates a new instance of an `RwLock<T>` which is unlocked.
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// use tokio::sync::RwLock;
200 ///
201 /// let lock = RwLock::new(5);
202 /// ```
203 #[track_caller]
new(value: T) -> RwLock<T> where T: Sized,204 pub fn new(value: T) -> RwLock<T>
205 where
206 T: Sized,
207 {
208 #[cfg(all(tokio_unstable, feature = "tracing"))]
209 let resource_span = {
210 let location = std::panic::Location::caller();
211 let resource_span = tracing::trace_span!(
212 "runtime.resource",
213 concrete_type = "RwLock",
214 kind = "Sync",
215 loc.file = location.file(),
216 loc.line = location.line(),
217 loc.col = location.column(),
218 );
219
220 resource_span.in_scope(|| {
221 tracing::trace!(
222 target: "runtime::resource::state_update",
223 max_readers = MAX_READS,
224 );
225
226 tracing::trace!(
227 target: "runtime::resource::state_update",
228 write_locked = false,
229 );
230
231 tracing::trace!(
232 target: "runtime::resource::state_update",
233 current_readers = 0,
234 );
235 });
236
237 resource_span
238 };
239
240 #[cfg(all(tokio_unstable, feature = "tracing"))]
241 let s = resource_span.in_scope(|| Semaphore::new(MAX_READS as usize));
242
243 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
244 let s = Semaphore::new(MAX_READS as usize);
245
246 RwLock {
247 mr: MAX_READS,
248 c: UnsafeCell::new(value),
249 s,
250 #[cfg(all(tokio_unstable, feature = "tracing"))]
251 resource_span,
252 }
253 }
254
255 /// Creates a new instance of an `RwLock<T>` which is unlocked
256 /// and allows a maximum of `max_reads` concurrent readers.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use tokio::sync::RwLock;
262 ///
263 /// let lock = RwLock::with_max_readers(5, 1024);
264 /// ```
265 ///
266 /// # Panics
267 ///
268 /// Panics if `max_reads` is more than `u32::MAX >> 3`.
269 #[track_caller]
with_max_readers(value: T, max_reads: u32) -> RwLock<T> where T: Sized,270 pub fn with_max_readers(value: T, max_reads: u32) -> RwLock<T>
271 where
272 T: Sized,
273 {
274 assert!(
275 max_reads <= MAX_READS,
276 "a RwLock may not be created with more than {} readers",
277 MAX_READS
278 );
279
280 #[cfg(all(tokio_unstable, feature = "tracing"))]
281 let resource_span = {
282 let location = std::panic::Location::caller();
283
284 let resource_span = tracing::trace_span!(
285 "runtime.resource",
286 concrete_type = "RwLock",
287 kind = "Sync",
288 loc.file = location.file(),
289 loc.line = location.line(),
290 loc.col = location.column(),
291 );
292
293 resource_span.in_scope(|| {
294 tracing::trace!(
295 target: "runtime::resource::state_update",
296 max_readers = max_reads,
297 );
298
299 tracing::trace!(
300 target: "runtime::resource::state_update",
301 write_locked = false,
302 );
303
304 tracing::trace!(
305 target: "runtime::resource::state_update",
306 current_readers = 0,
307 );
308 });
309
310 resource_span
311 };
312
313 #[cfg(all(tokio_unstable, feature = "tracing"))]
314 let s = resource_span.in_scope(|| Semaphore::new(max_reads as usize));
315
316 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
317 let s = Semaphore::new(max_reads as usize);
318
319 RwLock {
320 mr: max_reads,
321 c: UnsafeCell::new(value),
322 s,
323 #[cfg(all(tokio_unstable, feature = "tracing"))]
324 resource_span,
325 }
326 }
327
328 /// Creates a new instance of an `RwLock<T>` which is unlocked.
329 ///
330 /// # Examples
331 ///
332 /// ```
333 /// use tokio::sync::RwLock;
334 ///
335 /// static LOCK: RwLock<i32> = RwLock::const_new(5);
336 /// ```
337 #[cfg(not(all(loom, test)))]
const_new(value: T) -> RwLock<T> where T: Sized,338 pub const fn const_new(value: T) -> RwLock<T>
339 where
340 T: Sized,
341 {
342 RwLock {
343 mr: MAX_READS,
344 c: UnsafeCell::new(value),
345 s: Semaphore::const_new(MAX_READS as usize),
346 #[cfg(all(tokio_unstable, feature = "tracing"))]
347 resource_span: tracing::Span::none(),
348 }
349 }
350
351 /// Creates a new instance of an `RwLock<T>` which is unlocked
352 /// and allows a maximum of `max_reads` concurrent readers.
353 ///
354 /// # Examples
355 ///
356 /// ```
357 /// use tokio::sync::RwLock;
358 ///
359 /// static LOCK: RwLock<i32> = RwLock::const_with_max_readers(5, 1024);
360 /// ```
361 #[cfg(not(all(loom, test)))]
const_with_max_readers(value: T, max_reads: u32) -> RwLock<T> where T: Sized,362 pub const fn const_with_max_readers(value: T, max_reads: u32) -> RwLock<T>
363 where
364 T: Sized,
365 {
366 assert!(max_reads <= MAX_READS);
367
368 RwLock {
369 mr: max_reads,
370 c: UnsafeCell::new(value),
371 s: Semaphore::const_new(max_reads as usize),
372 #[cfg(all(tokio_unstable, feature = "tracing"))]
373 resource_span: tracing::Span::none(),
374 }
375 }
376
377 /// Locks this `RwLock` with shared read access, causing the current task
378 /// to yield until the lock has been acquired.
379 ///
380 /// The calling task will yield until there are no writers which hold the
381 /// lock. There may be other readers inside the lock when the task resumes.
382 ///
383 /// Note that under the priority policy of [`RwLock`], read locks are not
384 /// granted until prior write locks, to prevent starvation. Therefore
385 /// deadlock may occur if a read lock is held by the current task, a write
386 /// lock attempt is made, and then a subsequent read lock attempt is made
387 /// by the current task.
388 ///
389 /// Returns an RAII guard which will drop this read access of the `RwLock`
390 /// when dropped.
391 ///
392 /// # Cancel safety
393 ///
394 /// This method uses a queue to fairly distribute locks in the order they
395 /// were requested. Cancelling a call to `read` makes you lose your place in
396 /// the queue.
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// use std::sync::Arc;
402 /// use tokio::sync::RwLock;
403 ///
404 /// #[tokio::main]
405 /// async fn main() {
406 /// let lock = Arc::new(RwLock::new(1));
407 /// let c_lock = lock.clone();
408 ///
409 /// let n = lock.read().await;
410 /// assert_eq!(*n, 1);
411 ///
412 /// tokio::spawn(async move {
413 /// // While main has an active read lock, we acquire one too.
414 /// let r = c_lock.read().await;
415 /// assert_eq!(*r, 1);
416 /// }).await.expect("The spawned task has panicked");
417 ///
418 /// // Drop the guard after the spawned task finishes.
419 /// drop(n);
420 /// }
421 /// ```
read(&self) -> RwLockReadGuard<'_, T>422 pub async fn read(&self) -> RwLockReadGuard<'_, T> {
423 let acquire_fut = async {
424 self.s.acquire(1).await.unwrap_or_else(|_| {
425 // The semaphore was closed. but, we never explicitly close it, and we have a
426 // handle to it through the Arc, which means that this can never happen.
427 unreachable!()
428 });
429
430 RwLockReadGuard {
431 s: &self.s,
432 data: self.c.get(),
433 marker: PhantomData,
434 #[cfg(all(tokio_unstable, feature = "tracing"))]
435 resource_span: self.resource_span.clone(),
436 }
437 };
438
439 #[cfg(all(tokio_unstable, feature = "tracing"))]
440 let acquire_fut = trace::async_op(
441 move || acquire_fut,
442 self.resource_span.clone(),
443 "RwLock::read",
444 "poll",
445 false,
446 );
447
448 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
449 let guard = acquire_fut.await;
450
451 #[cfg(all(tokio_unstable, feature = "tracing"))]
452 self.resource_span.in_scope(|| {
453 tracing::trace!(
454 target: "runtime::resource::state_update",
455 current_readers = 1,
456 current_readers.op = "add",
457 )
458 });
459
460 guard
461 }
462
463 /// Blockingly locks this `RwLock` with shared read access.
464 ///
465 /// This method is intended for use cases where you
466 /// need to use this rwlock in asynchronous code as well as in synchronous code.
467 ///
468 /// Returns an RAII guard which will drop the read access of this `RwLock` when dropped.
469 ///
470 /// # Panics
471 ///
472 /// This function panics if called within an asynchronous execution context.
473 ///
474 /// - If you find yourself in an asynchronous execution context and needing
475 /// to call some (synchronous) function which performs one of these
476 /// `blocking_` operations, then consider wrapping that call inside
477 /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
478 /// (or [`block_in_place()`][crate::task::block_in_place]).
479 ///
480 /// # Examples
481 ///
482 /// ```
483 /// use std::sync::Arc;
484 /// use tokio::sync::RwLock;
485 ///
486 /// #[tokio::main]
487 /// async fn main() {
488 /// let rwlock = Arc::new(RwLock::new(1));
489 /// let mut write_lock = rwlock.write().await;
490 ///
491 /// let blocking_task = tokio::task::spawn_blocking({
492 /// let rwlock = Arc::clone(&rwlock);
493 /// move || {
494 /// // This shall block until the `write_lock` is released.
495 /// let read_lock = rwlock.blocking_read();
496 /// assert_eq!(*read_lock, 0);
497 /// }
498 /// });
499 ///
500 /// *write_lock -= 1;
501 /// drop(write_lock); // release the lock.
502 ///
503 /// // Await the completion of the blocking task.
504 /// blocking_task.await.unwrap();
505 ///
506 /// // Assert uncontended.
507 /// assert!(rwlock.try_write().is_ok());
508 /// }
509 /// ```
510 #[track_caller]
511 #[cfg(feature = "sync")]
blocking_read(&self) -> RwLockReadGuard<'_, T>512 pub fn blocking_read(&self) -> RwLockReadGuard<'_, T> {
513 crate::future::block_on(self.read())
514 }
515
516 /// Locks this `RwLock` with shared read access, causing the current task
517 /// to yield until the lock has been acquired.
518 ///
519 /// The calling task will yield until there are no writers which hold the
520 /// lock. There may be other readers inside the lock when the task resumes.
521 ///
522 /// This method is identical to [`RwLock::read`], except that the returned
523 /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
524 /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
525 /// method, and the guard will live for the `'static` lifetime, as it keeps
526 /// the `RwLock` alive by holding an `Arc`.
527 ///
528 /// Note that under the priority policy of [`RwLock`], read locks are not
529 /// granted until prior write locks, to prevent starvation. Therefore
530 /// deadlock may occur if a read lock is held by the current task, a write
531 /// lock attempt is made, and then a subsequent read lock attempt is made
532 /// by the current task.
533 ///
534 /// Returns an RAII guard which will drop this read access of the `RwLock`
535 /// when dropped.
536 ///
537 /// # Cancel safety
538 ///
539 /// This method uses a queue to fairly distribute locks in the order they
540 /// were requested. Cancelling a call to `read_owned` makes you lose your
541 /// place in the queue.
542 ///
543 /// # Examples
544 ///
545 /// ```
546 /// use std::sync::Arc;
547 /// use tokio::sync::RwLock;
548 ///
549 /// #[tokio::main]
550 /// async fn main() {
551 /// let lock = Arc::new(RwLock::new(1));
552 /// let c_lock = lock.clone();
553 ///
554 /// let n = lock.read_owned().await;
555 /// assert_eq!(*n, 1);
556 ///
557 /// tokio::spawn(async move {
558 /// // While main has an active read lock, we acquire one too.
559 /// let r = c_lock.read_owned().await;
560 /// assert_eq!(*r, 1);
561 /// }).await.expect("The spawned task has panicked");
562 ///
563 /// // Drop the guard after the spawned task finishes.
564 /// drop(n);
565 ///}
566 /// ```
read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T>567 pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
568 #[cfg(all(tokio_unstable, feature = "tracing"))]
569 let resource_span = self.resource_span.clone();
570
571 let acquire_fut = async {
572 self.s.acquire(1).await.unwrap_or_else(|_| {
573 // The semaphore was closed. but, we never explicitly close it, and we have a
574 // handle to it through the Arc, which means that this can never happen.
575 unreachable!()
576 });
577
578 OwnedRwLockReadGuard {
579 #[cfg(all(tokio_unstable, feature = "tracing"))]
580 resource_span: self.resource_span.clone(),
581 data: self.c.get(),
582 lock: self,
583 _p: PhantomData,
584 }
585 };
586
587 #[cfg(all(tokio_unstable, feature = "tracing"))]
588 let acquire_fut = trace::async_op(
589 move || acquire_fut,
590 resource_span,
591 "RwLock::read_owned",
592 "poll",
593 false,
594 );
595
596 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
597 let guard = acquire_fut.await;
598
599 #[cfg(all(tokio_unstable, feature = "tracing"))]
600 guard.resource_span.in_scope(|| {
601 tracing::trace!(
602 target: "runtime::resource::state_update",
603 current_readers = 1,
604 current_readers.op = "add",
605 )
606 });
607
608 guard
609 }
610
611 /// Attempts to acquire this `RwLock` with shared read access.
612 ///
613 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
614 /// Otherwise, an RAII guard is returned which will release read access
615 /// when dropped.
616 ///
617 /// [`TryLockError`]: TryLockError
618 ///
619 /// # Examples
620 ///
621 /// ```
622 /// use std::sync::Arc;
623 /// use tokio::sync::RwLock;
624 ///
625 /// #[tokio::main]
626 /// async fn main() {
627 /// let lock = Arc::new(RwLock::new(1));
628 /// let c_lock = lock.clone();
629 ///
630 /// let v = lock.try_read().unwrap();
631 /// assert_eq!(*v, 1);
632 ///
633 /// tokio::spawn(async move {
634 /// // While main has an active read lock, we acquire one too.
635 /// let n = c_lock.read().await;
636 /// assert_eq!(*n, 1);
637 /// }).await.expect("The spawned task has panicked");
638 ///
639 /// // Drop the guard when spawned task finishes.
640 /// drop(v);
641 /// }
642 /// ```
try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError>643 pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError> {
644 match self.s.try_acquire(1) {
645 Ok(permit) => permit,
646 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
647 Err(TryAcquireError::Closed) => unreachable!(),
648 }
649
650 let guard = RwLockReadGuard {
651 s: &self.s,
652 data: self.c.get(),
653 marker: marker::PhantomData,
654 #[cfg(all(tokio_unstable, feature = "tracing"))]
655 resource_span: self.resource_span.clone(),
656 };
657
658 #[cfg(all(tokio_unstable, feature = "tracing"))]
659 self.resource_span.in_scope(|| {
660 tracing::trace!(
661 target: "runtime::resource::state_update",
662 current_readers = 1,
663 current_readers.op = "add",
664 )
665 });
666
667 Ok(guard)
668 }
669
670 /// Attempts to acquire this `RwLock` with shared read access.
671 ///
672 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
673 /// Otherwise, an RAII guard is returned which will release read access
674 /// when dropped.
675 ///
676 /// This method is identical to [`RwLock::try_read`], except that the
677 /// returned guard references the `RwLock` with an [`Arc`] rather than by
678 /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
679 /// call this method, and the guard will live for the `'static` lifetime,
680 /// as it keeps the `RwLock` alive by holding an `Arc`.
681 ///
682 /// [`TryLockError`]: TryLockError
683 ///
684 /// # Examples
685 ///
686 /// ```
687 /// use std::sync::Arc;
688 /// use tokio::sync::RwLock;
689 ///
690 /// #[tokio::main]
691 /// async fn main() {
692 /// let lock = Arc::new(RwLock::new(1));
693 /// let c_lock = lock.clone();
694 ///
695 /// let v = lock.try_read_owned().unwrap();
696 /// assert_eq!(*v, 1);
697 ///
698 /// tokio::spawn(async move {
699 /// // While main has an active read lock, we acquire one too.
700 /// let n = c_lock.read_owned().await;
701 /// assert_eq!(*n, 1);
702 /// }).await.expect("The spawned task has panicked");
703 ///
704 /// // Drop the guard when spawned task finishes.
705 /// drop(v);
706 /// }
707 /// ```
try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError>708 pub fn try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError> {
709 match self.s.try_acquire(1) {
710 Ok(permit) => permit,
711 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
712 Err(TryAcquireError::Closed) => unreachable!(),
713 }
714
715 let guard = OwnedRwLockReadGuard {
716 #[cfg(all(tokio_unstable, feature = "tracing"))]
717 resource_span: self.resource_span.clone(),
718 data: self.c.get(),
719 lock: self,
720 _p: PhantomData,
721 };
722
723 #[cfg(all(tokio_unstable, feature = "tracing"))]
724 guard.resource_span.in_scope(|| {
725 tracing::trace!(
726 target: "runtime::resource::state_update",
727 current_readers = 1,
728 current_readers.op = "add",
729 )
730 });
731
732 Ok(guard)
733 }
734
735 /// Locks this `RwLock` with exclusive write access, causing the current
736 /// task to yield until the lock has been acquired.
737 ///
738 /// The calling task will yield while other writers or readers currently
739 /// have access to the lock.
740 ///
741 /// Returns an RAII guard which will drop the write access of this `RwLock`
742 /// when dropped.
743 ///
744 /// # Cancel safety
745 ///
746 /// This method uses a queue to fairly distribute locks in the order they
747 /// were requested. Cancelling a call to `write` makes you lose your place
748 /// in the queue.
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// use tokio::sync::RwLock;
754 ///
755 /// #[tokio::main]
756 /// async fn main() {
757 /// let lock = RwLock::new(1);
758 ///
759 /// let mut n = lock.write().await;
760 /// *n = 2;
761 ///}
762 /// ```
write(&self) -> RwLockWriteGuard<'_, T>763 pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
764 let acquire_fut = async {
765 self.s.acquire(self.mr).await.unwrap_or_else(|_| {
766 // The semaphore was closed. but, we never explicitly close it, and we have a
767 // handle to it through the Arc, which means that this can never happen.
768 unreachable!()
769 });
770
771 RwLockWriteGuard {
772 permits_acquired: self.mr,
773 s: &self.s,
774 data: self.c.get(),
775 marker: marker::PhantomData,
776 #[cfg(all(tokio_unstable, feature = "tracing"))]
777 resource_span: self.resource_span.clone(),
778 }
779 };
780
781 #[cfg(all(tokio_unstable, feature = "tracing"))]
782 let acquire_fut = trace::async_op(
783 move || acquire_fut,
784 self.resource_span.clone(),
785 "RwLock::write",
786 "poll",
787 false,
788 );
789
790 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
791 let guard = acquire_fut.await;
792
793 #[cfg(all(tokio_unstable, feature = "tracing"))]
794 self.resource_span.in_scope(|| {
795 tracing::trace!(
796 target: "runtime::resource::state_update",
797 write_locked = true,
798 write_locked.op = "override",
799 )
800 });
801
802 guard
803 }
804
805 /// Blockingly locks this `RwLock` with exclusive write access.
806 ///
807 /// This method is intended for use cases where you
808 /// need to use this rwlock in asynchronous code as well as in synchronous code.
809 ///
810 /// Returns an RAII guard which will drop the write access of this `RwLock` when dropped.
811 ///
812 /// # Panics
813 ///
814 /// This function panics if called within an asynchronous execution context.
815 ///
816 /// - If you find yourself in an asynchronous execution context and needing
817 /// to call some (synchronous) function which performs one of these
818 /// `blocking_` operations, then consider wrapping that call inside
819 /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
820 /// (or [`block_in_place()`][crate::task::block_in_place]).
821 ///
822 /// # Examples
823 ///
824 /// ```
825 /// use std::sync::Arc;
826 /// use tokio::{sync::RwLock};
827 ///
828 /// #[tokio::main]
829 /// async fn main() {
830 /// let rwlock = Arc::new(RwLock::new(1));
831 /// let read_lock = rwlock.read().await;
832 ///
833 /// let blocking_task = tokio::task::spawn_blocking({
834 /// let rwlock = Arc::clone(&rwlock);
835 /// move || {
836 /// // This shall block until the `read_lock` is released.
837 /// let mut write_lock = rwlock.blocking_write();
838 /// *write_lock = 2;
839 /// }
840 /// });
841 ///
842 /// assert_eq!(*read_lock, 1);
843 /// // Release the last outstanding read lock.
844 /// drop(read_lock);
845 ///
846 /// // Await the completion of the blocking task.
847 /// blocking_task.await.unwrap();
848 ///
849 /// // Assert uncontended.
850 /// let read_lock = rwlock.try_read().unwrap();
851 /// assert_eq!(*read_lock, 2);
852 /// }
853 /// ```
854 #[track_caller]
855 #[cfg(feature = "sync")]
blocking_write(&self) -> RwLockWriteGuard<'_, T>856 pub fn blocking_write(&self) -> RwLockWriteGuard<'_, T> {
857 crate::future::block_on(self.write())
858 }
859
860 /// Locks this `RwLock` with exclusive write access, causing the current
861 /// task to yield until the lock has been acquired.
862 ///
863 /// The calling task will yield while other writers or readers currently
864 /// have access to the lock.
865 ///
866 /// This method is identical to [`RwLock::write`], except that the returned
867 /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
868 /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
869 /// method, and the guard will live for the `'static` lifetime, as it keeps
870 /// the `RwLock` alive by holding an `Arc`.
871 ///
872 /// Returns an RAII guard which will drop the write access of this `RwLock`
873 /// when dropped.
874 ///
875 /// # Cancel safety
876 ///
877 /// This method uses a queue to fairly distribute locks in the order they
878 /// were requested. Cancelling a call to `write_owned` makes you lose your
879 /// place in the queue.
880 ///
881 /// # Examples
882 ///
883 /// ```
884 /// use std::sync::Arc;
885 /// use tokio::sync::RwLock;
886 ///
887 /// #[tokio::main]
888 /// async fn main() {
889 /// let lock = Arc::new(RwLock::new(1));
890 ///
891 /// let mut n = lock.write_owned().await;
892 /// *n = 2;
893 ///}
894 /// ```
write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T>895 pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
896 #[cfg(all(tokio_unstable, feature = "tracing"))]
897 let resource_span = self.resource_span.clone();
898
899 let acquire_fut = async {
900 self.s.acquire(self.mr).await.unwrap_or_else(|_| {
901 // The semaphore was closed. but, we never explicitly close it, and we have a
902 // handle to it through the Arc, which means that this can never happen.
903 unreachable!()
904 });
905
906 OwnedRwLockWriteGuard {
907 #[cfg(all(tokio_unstable, feature = "tracing"))]
908 resource_span: self.resource_span.clone(),
909 permits_acquired: self.mr,
910 data: self.c.get(),
911 lock: self,
912 _p: PhantomData,
913 }
914 };
915
916 #[cfg(all(tokio_unstable, feature = "tracing"))]
917 let acquire_fut = trace::async_op(
918 move || acquire_fut,
919 resource_span,
920 "RwLock::write_owned",
921 "poll",
922 false,
923 );
924
925 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
926 let guard = acquire_fut.await;
927
928 #[cfg(all(tokio_unstable, feature = "tracing"))]
929 guard.resource_span.in_scope(|| {
930 tracing::trace!(
931 target: "runtime::resource::state_update",
932 write_locked = true,
933 write_locked.op = "override",
934 )
935 });
936
937 guard
938 }
939
940 /// Attempts to acquire this `RwLock` with exclusive write access.
941 ///
942 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
943 /// Otherwise, an RAII guard is returned which will release write access
944 /// when dropped.
945 ///
946 /// [`TryLockError`]: TryLockError
947 ///
948 /// # Examples
949 ///
950 /// ```
951 /// use tokio::sync::RwLock;
952 ///
953 /// #[tokio::main]
954 /// async fn main() {
955 /// let rw = RwLock::new(1);
956 ///
957 /// let v = rw.read().await;
958 /// assert_eq!(*v, 1);
959 ///
960 /// assert!(rw.try_write().is_err());
961 /// }
962 /// ```
try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError>963 pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
964 match self.s.try_acquire(self.mr) {
965 Ok(permit) => permit,
966 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
967 Err(TryAcquireError::Closed) => unreachable!(),
968 }
969
970 let guard = RwLockWriteGuard {
971 permits_acquired: self.mr,
972 s: &self.s,
973 data: self.c.get(),
974 marker: marker::PhantomData,
975 #[cfg(all(tokio_unstable, feature = "tracing"))]
976 resource_span: self.resource_span.clone(),
977 };
978
979 #[cfg(all(tokio_unstable, feature = "tracing"))]
980 self.resource_span.in_scope(|| {
981 tracing::trace!(
982 target: "runtime::resource::state_update",
983 write_locked = true,
984 write_locked.op = "override",
985 )
986 });
987
988 Ok(guard)
989 }
990
991 /// Attempts to acquire this `RwLock` with exclusive write access.
992 ///
993 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
994 /// Otherwise, an RAII guard is returned which will release write access
995 /// when dropped.
996 ///
997 /// This method is identical to [`RwLock::try_write`], except that the
998 /// returned guard references the `RwLock` with an [`Arc`] rather than by
999 /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
1000 /// call this method, and the guard will live for the `'static` lifetime,
1001 /// as it keeps the `RwLock` alive by holding an `Arc`.
1002 ///
1003 /// [`TryLockError`]: TryLockError
1004 ///
1005 /// # Examples
1006 ///
1007 /// ```
1008 /// use std::sync::Arc;
1009 /// use tokio::sync::RwLock;
1010 ///
1011 /// #[tokio::main]
1012 /// async fn main() {
1013 /// let rw = Arc::new(RwLock::new(1));
1014 ///
1015 /// let v = Arc::clone(&rw).read_owned().await;
1016 /// assert_eq!(*v, 1);
1017 ///
1018 /// assert!(rw.try_write_owned().is_err());
1019 /// }
1020 /// ```
try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError>1021 pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
1022 match self.s.try_acquire(self.mr) {
1023 Ok(permit) => permit,
1024 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
1025 Err(TryAcquireError::Closed) => unreachable!(),
1026 }
1027
1028 let guard = OwnedRwLockWriteGuard {
1029 #[cfg(all(tokio_unstable, feature = "tracing"))]
1030 resource_span: self.resource_span.clone(),
1031 permits_acquired: self.mr,
1032 data: self.c.get(),
1033 lock: self,
1034 _p: PhantomData,
1035 };
1036
1037 #[cfg(all(tokio_unstable, feature = "tracing"))]
1038 guard.resource_span.in_scope(|| {
1039 tracing::trace!(
1040 target: "runtime::resource::state_update",
1041 write_locked = true,
1042 write_locked.op = "override",
1043 )
1044 });
1045
1046 Ok(guard)
1047 }
1048
1049 /// Returns a mutable reference to the underlying data.
1050 ///
1051 /// Since this call borrows the `RwLock` mutably, no actual locking needs to
1052 /// take place -- the mutable borrow statically guarantees no locks exist.
1053 ///
1054 /// # Examples
1055 ///
1056 /// ```
1057 /// use tokio::sync::RwLock;
1058 ///
1059 /// fn main() {
1060 /// let mut lock = RwLock::new(1);
1061 ///
1062 /// let n = lock.get_mut();
1063 /// *n = 2;
1064 /// }
1065 /// ```
get_mut(&mut self) -> &mut T1066 pub fn get_mut(&mut self) -> &mut T {
1067 unsafe {
1068 // Safety: This is https://github.com/rust-lang/rust/pull/76936
1069 &mut *self.c.get()
1070 }
1071 }
1072
1073 /// Consumes the lock, returning the underlying data.
into_inner(self) -> T where T: Sized,1074 pub fn into_inner(self) -> T
1075 where
1076 T: Sized,
1077 {
1078 self.c.into_inner()
1079 }
1080 }
1081
1082 impl<T> From<T> for RwLock<T> {
from(s: T) -> Self1083 fn from(s: T) -> Self {
1084 Self::new(s)
1085 }
1086 }
1087
1088 impl<T: ?Sized> Default for RwLock<T>
1089 where
1090 T: Default,
1091 {
default() -> Self1092 fn default() -> Self {
1093 Self::new(T::default())
1094 }
1095 }
1096
1097 impl<T: ?Sized> std::fmt::Debug for RwLock<T>
1098 where
1099 T: std::fmt::Debug,
1100 {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1102 let mut d = f.debug_struct("RwLock");
1103 match self.try_read() {
1104 Ok(inner) => d.field("data", &&*inner),
1105 Err(_) => d.field("data", &format_args!("<locked>")),
1106 };
1107 d.finish()
1108 }
1109 }
1110