• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "sync")]
3 
4 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
5 use wasm_bindgen_test::wasm_bindgen_test as test;
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
8 
9 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
10 use tokio::test as maybe_tokio_test;
11 
12 use std::task::Poll;
13 
14 use futures::future::FutureExt;
15 
16 use tokio::sync::{RwLock, RwLockWriteGuard};
17 use tokio_test::task::spawn;
18 use tokio_test::{assert_pending, assert_ready};
19 
20 #[test]
into_inner()21 fn into_inner() {
22     let rwlock = RwLock::new(42);
23     assert_eq!(rwlock.into_inner(), 42);
24 }
25 
26 // multiple reads should be Ready
27 #[test]
read_shared()28 fn read_shared() {
29     let rwlock = RwLock::new(100);
30 
31     let mut t1 = spawn(rwlock.read());
32     let _g1 = assert_ready!(t1.poll());
33     let mut t2 = spawn(rwlock.read());
34     let _g2 = assert_ready!(t2.poll());
35 }
36 
37 // When there is an active shared owner, exclusive access should not be possible
38 #[test]
write_shared_pending()39 fn write_shared_pending() {
40     let rwlock = RwLock::new(100);
41     let mut t1 = spawn(rwlock.read());
42 
43     let _g1 = assert_ready!(t1.poll());
44     let mut t2 = spawn(rwlock.write());
45     assert_pending!(t2.poll());
46 }
47 
48 // When there is an active exclusive owner, subsequent exclusive access should not be possible
49 #[test]
read_exclusive_pending()50 fn read_exclusive_pending() {
51     let rwlock = RwLock::new(100);
52     let mut t1 = spawn(rwlock.write());
53 
54     let _g1 = assert_ready!(t1.poll());
55     let mut t2 = spawn(rwlock.read());
56     assert_pending!(t2.poll());
57 }
58 
59 // If the max shared access is reached and subsequent shared access is pending
60 // should be made available when one of the shared accesses is dropped
61 #[test]
exhaust_reading()62 fn exhaust_reading() {
63     let rwlock = RwLock::with_max_readers(100, 1024);
64     let mut reads = Vec::new();
65     loop {
66         let mut t = spawn(rwlock.read());
67         match t.poll() {
68             Poll::Ready(guard) => reads.push(guard),
69             Poll::Pending => break,
70         }
71     }
72 
73     let mut t1 = spawn(rwlock.read());
74     assert_pending!(t1.poll());
75     let g2 = reads.pop().unwrap();
76     drop(g2);
77     assert!(t1.is_woken());
78     let _g1 = assert_ready!(t1.poll());
79 }
80 
81 // When there is an active exclusive owner, subsequent exclusive access should not be possible
82 #[test]
write_exclusive_pending()83 fn write_exclusive_pending() {
84     let rwlock = RwLock::new(100);
85     let mut t1 = spawn(rwlock.write());
86 
87     let _g1 = assert_ready!(t1.poll());
88     let mut t2 = spawn(rwlock.write());
89     assert_pending!(t2.poll());
90 }
91 
92 // When there is an active shared owner, exclusive access should be possible after shared is dropped
93 #[test]
write_shared_drop()94 fn write_shared_drop() {
95     let rwlock = RwLock::new(100);
96     let mut t1 = spawn(rwlock.read());
97 
98     let g1 = assert_ready!(t1.poll());
99     let mut t2 = spawn(rwlock.write());
100     assert_pending!(t2.poll());
101     drop(g1);
102     assert!(t2.is_woken());
103     let _g2 = assert_ready!(t2.poll());
104 }
105 
106 // when there is an active shared owner, and exclusive access is triggered,
107 // subsequent shared access should not be possible as write gathers all the available semaphore permits
108 #[test]
write_read_shared_pending()109 fn write_read_shared_pending() {
110     let rwlock = RwLock::new(100);
111     let mut t1 = spawn(rwlock.read());
112     let _g1 = assert_ready!(t1.poll());
113 
114     let mut t2 = spawn(rwlock.read());
115     let _g2 = assert_ready!(t2.poll());
116 
117     let mut t3 = spawn(rwlock.write());
118     assert_pending!(t3.poll());
119 
120     let mut t4 = spawn(rwlock.read());
121     assert_pending!(t4.poll());
122 }
123 
124 // when there is an active shared owner, and exclusive access is triggered,
125 // reading should be possible after pending exclusive access is dropped
126 #[test]
write_read_shared_drop_pending()127 fn write_read_shared_drop_pending() {
128     let rwlock = RwLock::new(100);
129     let mut t1 = spawn(rwlock.read());
130     let _g1 = assert_ready!(t1.poll());
131 
132     let mut t2 = spawn(rwlock.write());
133     assert_pending!(t2.poll());
134 
135     let mut t3 = spawn(rwlock.read());
136     assert_pending!(t3.poll());
137     drop(t2);
138 
139     assert!(t3.is_woken());
140     let _t3 = assert_ready!(t3.poll());
141 }
142 
143 // Acquire an RwLock nonexclusively by a single task
144 #[maybe_tokio_test]
read_uncontested()145 async fn read_uncontested() {
146     let rwlock = RwLock::new(100);
147     let result = *rwlock.read().await;
148 
149     assert_eq!(result, 100);
150 }
151 
152 // Acquire an uncontested RwLock in exclusive mode
153 #[maybe_tokio_test]
write_uncontested()154 async fn write_uncontested() {
155     let rwlock = RwLock::new(100);
156     let mut result = rwlock.write().await;
157     *result += 50;
158     assert_eq!(*result, 150);
159 }
160 
161 // RwLocks should be acquired in the order that their Futures are waited upon.
162 #[maybe_tokio_test]
write_order()163 async fn write_order() {
164     let rwlock = RwLock::<Vec<u32>>::new(vec![]);
165     let fut2 = rwlock.write().map(|mut guard| guard.push(2));
166     let fut1 = rwlock.write().map(|mut guard| guard.push(1));
167     fut1.await;
168     fut2.await;
169 
170     let g = rwlock.read().await;
171     assert_eq!(*g, vec![1, 2]);
172 }
173 
174 // A single RwLock is contested by tasks in multiple threads
175 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
176 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
multithreaded()177 async fn multithreaded() {
178     use futures::stream::{self, StreamExt};
179     use std::sync::Arc;
180     use tokio::sync::Barrier;
181 
182     let barrier = Arc::new(Barrier::new(5));
183     let rwlock = Arc::new(RwLock::<u32>::new(0));
184     let rwclone1 = rwlock.clone();
185     let rwclone2 = rwlock.clone();
186     let rwclone3 = rwlock.clone();
187     let rwclone4 = rwlock.clone();
188 
189     let b1 = barrier.clone();
190     tokio::spawn(async move {
191         stream::iter(0..1000)
192             .for_each(move |_| {
193                 let rwlock = rwclone1.clone();
194                 async move {
195                     let mut guard = rwlock.write().await;
196                     *guard += 2;
197                 }
198             })
199             .await;
200         b1.wait().await;
201     });
202 
203     let b2 = barrier.clone();
204     tokio::spawn(async move {
205         stream::iter(0..1000)
206             .for_each(move |_| {
207                 let rwlock = rwclone2.clone();
208                 async move {
209                     let mut guard = rwlock.write().await;
210                     *guard += 3;
211                 }
212             })
213             .await;
214         b2.wait().await;
215     });
216 
217     let b3 = barrier.clone();
218     tokio::spawn(async move {
219         stream::iter(0..1000)
220             .for_each(move |_| {
221                 let rwlock = rwclone3.clone();
222                 async move {
223                     let mut guard = rwlock.write().await;
224                     *guard += 5;
225                 }
226             })
227             .await;
228         b3.wait().await;
229     });
230 
231     let b4 = barrier.clone();
232     tokio::spawn(async move {
233         stream::iter(0..1000)
234             .for_each(move |_| {
235                 let rwlock = rwclone4.clone();
236                 async move {
237                     let mut guard = rwlock.write().await;
238                     *guard += 7;
239                 }
240             })
241             .await;
242         b4.wait().await;
243     });
244 
245     barrier.wait().await;
246     let g = rwlock.read().await;
247     assert_eq!(*g, 17_000);
248 }
249 
250 #[maybe_tokio_test]
try_write()251 async fn try_write() {
252     let lock = RwLock::new(0);
253     let read_guard = lock.read().await;
254     assert!(lock.try_write().is_err());
255     drop(read_guard);
256     assert!(lock.try_write().is_ok());
257 }
258 
259 #[test]
try_read_try_write()260 fn try_read_try_write() {
261     let lock: RwLock<usize> = RwLock::new(15);
262 
263     {
264         let rg1 = lock.try_read().unwrap();
265         assert_eq!(*rg1, 15);
266 
267         assert!(lock.try_write().is_err());
268 
269         let rg2 = lock.try_read().unwrap();
270         assert_eq!(*rg2, 15)
271     }
272 
273     {
274         let mut wg = lock.try_write().unwrap();
275         *wg = 1515;
276 
277         assert!(lock.try_read().is_err())
278     }
279 
280     assert_eq!(*lock.try_read().unwrap(), 1515);
281 }
282 
283 #[maybe_tokio_test]
downgrade_map()284 async fn downgrade_map() {
285     let lock = RwLock::new(0);
286     let write_guard = lock.write().await;
287     let mut read_t = spawn(lock.read());
288 
289     // We can't create a read when a write exists
290     assert_pending!(read_t.poll());
291 
292     // During the call to `f`, `read_t` doesn't have access yet.
293     let read_guard1 = RwLockWriteGuard::downgrade_map(write_guard, |v| {
294         assert_pending!(read_t.poll());
295         v
296     });
297 
298     // After the downgrade, `read_t` got the lock
299     let read_guard2 = assert_ready!(read_t.poll());
300 
301     // Ensure they're equal, as we return the original value
302     assert_eq!(&*read_guard1 as *const _, &*read_guard2 as *const _);
303 }
304 
305 #[maybe_tokio_test]
try_downgrade_map()306 async fn try_downgrade_map() {
307     let lock = RwLock::new(0);
308     let write_guard = lock.write().await;
309     let mut read_t = spawn(lock.read());
310 
311     // We can't create a read when a write exists
312     assert_pending!(read_t.poll());
313 
314     // During the call to `f`, `read_t` doesn't have access yet.
315     let write_guard = RwLockWriteGuard::try_downgrade_map(write_guard, |_| {
316         assert_pending!(read_t.poll());
317         None::<&()>
318     })
319     .expect_err("downgrade didn't fail");
320 
321     // After `f` returns `None`, `read_t` doesn't have access
322     assert_pending!(read_t.poll());
323 
324     // After `f` returns `Some`, `read_t` does have access
325     let read_guard1 = RwLockWriteGuard::try_downgrade_map(write_guard, |v| Some(v))
326         .expect("downgrade didn't succeed");
327     let read_guard2 = assert_ready!(read_t.poll());
328 
329     // Ensure they're equal, as we return the original value
330     assert_eq!(&*read_guard1 as *const _, &*read_guard2 as *const _);
331 }
332