• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::mutex::MutexGuard;
9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::{deadlock, util};
11 use core::{
12     fmt, ptr,
13     sync::atomic::{AtomicPtr, Ordering},
14 };
15 use lock_api::RawMutex as RawMutex_;
16 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17 use std::time::{Duration, Instant};
18 
19 /// A type indicating whether a timed wait on a condition variable returned
20 /// due to a time out or not.
21 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
22 pub struct WaitTimeoutResult(bool);
23 
24 impl WaitTimeoutResult {
25     /// Returns whether the wait was known to have timed out.
26     #[inline]
timed_out(self) -> bool27     pub fn timed_out(self) -> bool {
28         self.0
29     }
30 }
31 
32 /// A Condition Variable
33 ///
34 /// Condition variables represent the ability to block a thread such that it
35 /// consumes no CPU time while waiting for an event to occur. Condition
36 /// variables are typically associated with a boolean predicate (a condition)
37 /// and a mutex. The predicate is always verified inside of the mutex before
38 /// determining that thread must block.
39 ///
40 /// Note that this module places one additional restriction over the system
41 /// condition variables: each condvar can be used with only one mutex at a
42 /// time. Any attempt to use multiple mutexes on the same condition variable
43 /// simultaneously will result in a runtime panic. However it is possible to
44 /// switch to a different mutex if there are no threads currently waiting on
45 /// the condition variable.
46 ///
47 /// # Differences from the standard library `Condvar`
48 ///
49 /// - No spurious wakeups: A wait will only return a non-timeout result if it
50 ///   was woken up by `notify_one` or `notify_all`.
51 /// - `Condvar::notify_all` will only wake up a single thread, the rest are
52 ///   requeued to wait for the `Mutex` to be unlocked by the thread that was
53 ///   woken up.
54 /// - Only requires 1 word of space, whereas the standard library boxes the
55 ///   `Condvar` due to platform limitations.
56 /// - Can be statically constructed (requires the `const_fn` nightly feature).
57 /// - Does not require any drop glue when dropped.
58 /// - Inline fast path for the uncontended case.
59 ///
60 /// # Examples
61 ///
62 /// ```
63 /// use parking_lot::{Mutex, Condvar};
64 /// use std::sync::Arc;
65 /// use std::thread;
66 ///
67 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
68 /// let pair2 = pair.clone();
69 ///
70 /// // Inside of our lock, spawn a new thread, and then wait for it to start
71 /// thread::spawn(move|| {
72 ///     let &(ref lock, ref cvar) = &*pair2;
73 ///     let mut started = lock.lock();
74 ///     *started = true;
75 ///     cvar.notify_one();
76 /// });
77 ///
78 /// // wait for the thread to start up
79 /// let &(ref lock, ref cvar) = &*pair;
80 /// let mut started = lock.lock();
81 /// if !*started {
82 ///     cvar.wait(&mut started);
83 /// }
84 /// // Note that we used an if instead of a while loop above. This is only
85 /// // possible because parking_lot's Condvar will never spuriously wake up.
86 /// // This means that wait() will only return after notify_one or notify_all is
87 /// // called.
88 /// ```
89 pub struct Condvar {
90     state: AtomicPtr<RawMutex>,
91 }
92 
93 impl Condvar {
94     /// Creates a new condition variable which is ready to be waited on and
95     /// notified.
96     #[inline]
new() -> Condvar97     pub const fn new() -> Condvar {
98         Condvar {
99             state: AtomicPtr::new(ptr::null_mut()),
100         }
101     }
102 
103     /// Wakes up one blocked thread on this condvar.
104     ///
105     /// Returns whether a thread was woken up.
106     ///
107     /// If there is a blocked thread on this condition variable, then it will
108     /// be woken up from its call to `wait` or `wait_timeout`. Calls to
109     /// `notify_one` are not buffered in any way.
110     ///
111     /// To wake up all threads, see `notify_all()`.
112     ///
113     /// # Examples
114     ///
115     /// ```
116     /// use parking_lot::Condvar;
117     ///
118     /// let condvar = Condvar::new();
119     ///
120     /// // do something with condvar, share it with other threads
121     ///
122     /// if !condvar.notify_one() {
123     ///     println!("Nobody was listening for this.");
124     /// }
125     /// ```
126     #[inline]
notify_one(&self) -> bool127     pub fn notify_one(&self) -> bool {
128         // Nothing to do if there are no waiting threads
129         let state = self.state.load(Ordering::Relaxed);
130         if state.is_null() {
131             return false;
132         }
133 
134         self.notify_one_slow(state)
135     }
136 
137     #[cold]
notify_one_slow(&self, mutex: *mut RawMutex) -> bool138     fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
139         unsafe {
140             // Unpark one thread and requeue the rest onto the mutex
141             let from = self as *const _ as usize;
142             let to = mutex as usize;
143             let validate = || {
144                 // Make sure that our atomic state still points to the same
145                 // mutex. If not then it means that all threads on the current
146                 // mutex were woken up and a new waiting thread switched to a
147                 // different mutex. In that case we can get away with doing
148                 // nothing.
149                 if self.state.load(Ordering::Relaxed) != mutex {
150                     return RequeueOp::Abort;
151                 }
152 
153                 // Unpark one thread if the mutex is unlocked, otherwise just
154                 // requeue everything to the mutex. This is safe to do here
155                 // since unlocking the mutex when the parked bit is set requires
156                 // locking the queue. There is the possibility of a race if the
157                 // mutex gets locked after we check, but that doesn't matter in
158                 // this case.
159                 if (*mutex).mark_parked_if_locked() {
160                     RequeueOp::RequeueOne
161                 } else {
162                     RequeueOp::UnparkOne
163                 }
164             };
165             let callback = |_op, result: UnparkResult| {
166                 // Clear our state if there are no more waiting threads
167                 if !result.have_more_threads {
168                     self.state.store(ptr::null_mut(), Ordering::Relaxed);
169                 }
170                 TOKEN_NORMAL
171             };
172             let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
173 
174             res.unparked_threads + res.requeued_threads != 0
175         }
176     }
177 
178     /// Wakes up all blocked threads on this condvar.
179     ///
180     /// Returns the number of threads woken up.
181     ///
182     /// This method will ensure that any current waiters on the condition
183     /// variable are awoken. Calls to `notify_all()` are not buffered in any
184     /// way.
185     ///
186     /// To wake up only one thread, see `notify_one()`.
187     #[inline]
notify_all(&self) -> usize188     pub fn notify_all(&self) -> usize {
189         // Nothing to do if there are no waiting threads
190         let state = self.state.load(Ordering::Relaxed);
191         if state.is_null() {
192             return 0;
193         }
194 
195         self.notify_all_slow(state)
196     }
197 
198     #[cold]
notify_all_slow(&self, mutex: *mut RawMutex) -> usize199     fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
200         unsafe {
201             // Unpark one thread and requeue the rest onto the mutex
202             let from = self as *const _ as usize;
203             let to = mutex as usize;
204             let validate = || {
205                 // Make sure that our atomic state still points to the same
206                 // mutex. If not then it means that all threads on the current
207                 // mutex were woken up and a new waiting thread switched to a
208                 // different mutex. In that case we can get away with doing
209                 // nothing.
210                 if self.state.load(Ordering::Relaxed) != mutex {
211                     return RequeueOp::Abort;
212                 }
213 
214                 // Clear our state since we are going to unpark or requeue all
215                 // threads.
216                 self.state.store(ptr::null_mut(), Ordering::Relaxed);
217 
218                 // Unpark one thread if the mutex is unlocked, otherwise just
219                 // requeue everything to the mutex. This is safe to do here
220                 // since unlocking the mutex when the parked bit is set requires
221                 // locking the queue. There is the possibility of a race if the
222                 // mutex gets locked after we check, but that doesn't matter in
223                 // this case.
224                 if (*mutex).mark_parked_if_locked() {
225                     RequeueOp::RequeueAll
226                 } else {
227                     RequeueOp::UnparkOneRequeueRest
228                 }
229             };
230             let callback = |op, result: UnparkResult| {
231                 // If we requeued threads to the mutex, mark it as having
232                 // parked threads. The RequeueAll case is already handled above.
233                 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
234                     (*mutex).mark_parked();
235                 }
236                 TOKEN_NORMAL
237             };
238             let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
239 
240             res.unparked_threads + res.requeued_threads
241         }
242     }
243 
244     /// Blocks the current thread until this condition variable receives a
245     /// notification.
246     ///
247     /// This function will atomically unlock the mutex specified (represented by
248     /// `mutex_guard`) and block the current thread. This means that any calls
249     /// to `notify_*()` which happen logically after the mutex is unlocked are
250     /// candidates to wake this thread up. When this function call returns, the
251     /// lock specified will have been re-acquired.
252     ///
253     /// # Panics
254     ///
255     /// This function will panic if another thread is waiting on the `Condvar`
256     /// with a different `Mutex` object.
257     #[inline]
wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)258     pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
259         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
260     }
261 
262     /// Waits on this condition variable for a notification, timing out after
263     /// the specified time instant.
264     ///
265     /// The semantics of this function are equivalent to `wait()` except that
266     /// the thread will be blocked roughly until `timeout` is reached. This
267     /// method should not be used for precise timing due to anomalies such as
268     /// preemption or platform differences that may not cause the maximum
269     /// amount of time waited to be precisely `timeout`.
270     ///
271     /// Note that the best effort is made to ensure that the time waited is
272     /// measured with a monotonic clock, and not affected by the changes made to
273     /// the system time.
274     ///
275     /// The returned `WaitTimeoutResult` value indicates if the timeout is
276     /// known to have elapsed.
277     ///
278     /// Like `wait`, the lock specified will be re-acquired when this function
279     /// returns, regardless of whether the timeout elapsed or not.
280     ///
281     /// # Panics
282     ///
283     /// This function will panic if another thread is waiting on the `Condvar`
284     /// with a different `Mutex` object.
285     #[inline]
wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult286     pub fn wait_until<T: ?Sized>(
287         &self,
288         mutex_guard: &mut MutexGuard<'_, T>,
289         timeout: Instant,
290     ) -> WaitTimeoutResult {
291         self.wait_until_internal(
292             unsafe { MutexGuard::mutex(mutex_guard).raw() },
293             Some(timeout),
294         )
295     }
296 
297     // This is a non-generic function to reduce the monomorphization cost of
298     // using `wait_until`.
wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult299     fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
300         unsafe {
301             let result;
302             let mut bad_mutex = false;
303             let mut requeued = false;
304             {
305                 let addr = self as *const _ as usize;
306                 let lock_addr = mutex as *const _ as *mut _;
307                 let validate = || {
308                     // Ensure we don't use two different mutexes with the same
309                     // Condvar at the same time. This is done while locked to
310                     // avoid races with notify_one
311                     let state = self.state.load(Ordering::Relaxed);
312                     if state.is_null() {
313                         self.state.store(lock_addr, Ordering::Relaxed);
314                     } else if state != lock_addr {
315                         bad_mutex = true;
316                         return false;
317                     }
318                     true
319                 };
320                 let before_sleep = || {
321                     // Unlock the mutex before sleeping...
322                     mutex.unlock();
323                 };
324                 let timed_out = |k, was_last_thread| {
325                     // If we were requeued to a mutex, then we did not time out.
326                     // We'll just park ourselves on the mutex again when we try
327                     // to lock it later.
328                     requeued = k != addr;
329 
330                     // If we were the last thread on the queue then we need to
331                     // clear our state. This is normally done by the
332                     // notify_{one,all} functions when not timing out.
333                     if !requeued && was_last_thread {
334                         self.state.store(ptr::null_mut(), Ordering::Relaxed);
335                     }
336                 };
337                 result = parking_lot_core::park(
338                     addr,
339                     validate,
340                     before_sleep,
341                     timed_out,
342                     DEFAULT_PARK_TOKEN,
343                     timeout,
344                 );
345             }
346 
347             // Panic if we tried to use multiple mutexes with a Condvar. Note
348             // that at this point the MutexGuard is still locked. It will be
349             // unlocked by the unwinding logic.
350             if bad_mutex {
351                 panic!("attempted to use a condition variable with more than one mutex");
352             }
353 
354             // ... and re-lock it once we are done sleeping
355             if result == ParkResult::Unparked(TOKEN_HANDOFF) {
356                 deadlock::acquire_resource(mutex as *const _ as usize);
357             } else {
358                 mutex.lock();
359             }
360 
361             WaitTimeoutResult(!(result.is_unparked() || requeued))
362         }
363     }
364 
365     /// Waits on this condition variable for a notification, timing out after a
366     /// specified duration.
367     ///
368     /// The semantics of this function are equivalent to `wait()` except that
369     /// the thread will be blocked for roughly no longer than `timeout`. This
370     /// method should not be used for precise timing due to anomalies such as
371     /// preemption or platform differences that may not cause the maximum
372     /// amount of time waited to be precisely `timeout`.
373     ///
374     /// Note that the best effort is made to ensure that the time waited is
375     /// measured with a monotonic clock, and not affected by the changes made to
376     /// the system time.
377     ///
378     /// The returned `WaitTimeoutResult` value indicates if the timeout is
379     /// known to have elapsed.
380     ///
381     /// Like `wait`, the lock specified will be re-acquired when this function
382     /// returns, regardless of whether the timeout elapsed or not.
383     #[inline]
wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult384     pub fn wait_for<T: ?Sized>(
385         &self,
386         mutex_guard: &mut MutexGuard<'_, T>,
387         timeout: Duration,
388     ) -> WaitTimeoutResult {
389         let deadline = util::to_deadline(timeout);
390         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
391     }
392 }
393 
394 impl Default for Condvar {
395     #[inline]
default() -> Condvar396     fn default() -> Condvar {
397         Condvar::new()
398     }
399 }
400 
401 impl fmt::Debug for Condvar {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result402     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403         f.pad("Condvar { .. }")
404     }
405 }
406 
407 #[cfg(test)]
408 mod tests {
409     use crate::{Condvar, Mutex, MutexGuard};
410     use std::sync::mpsc::channel;
411     use std::sync::Arc;
412     use std::thread;
413     use std::time::Duration;
414     use std::time::Instant;
415 
416     #[test]
smoke()417     fn smoke() {
418         let c = Condvar::new();
419         c.notify_one();
420         c.notify_all();
421     }
422 
423     #[test]
notify_one()424     fn notify_one() {
425         let m = Arc::new(Mutex::new(()));
426         let m2 = m.clone();
427         let c = Arc::new(Condvar::new());
428         let c2 = c.clone();
429 
430         let mut g = m.lock();
431         let _t = thread::spawn(move || {
432             let _g = m2.lock();
433             c2.notify_one();
434         });
435         c.wait(&mut g);
436     }
437 
438     #[test]
notify_all()439     fn notify_all() {
440         const N: usize = 10;
441 
442         let data = Arc::new((Mutex::new(0), Condvar::new()));
443         let (tx, rx) = channel();
444         for _ in 0..N {
445             let data = data.clone();
446             let tx = tx.clone();
447             thread::spawn(move || {
448                 let &(ref lock, ref cond) = &*data;
449                 let mut cnt = lock.lock();
450                 *cnt += 1;
451                 if *cnt == N {
452                     tx.send(()).unwrap();
453                 }
454                 while *cnt != 0 {
455                     cond.wait(&mut cnt);
456                 }
457                 tx.send(()).unwrap();
458             });
459         }
460         drop(tx);
461 
462         let &(ref lock, ref cond) = &*data;
463         rx.recv().unwrap();
464         let mut cnt = lock.lock();
465         *cnt = 0;
466         cond.notify_all();
467         drop(cnt);
468 
469         for _ in 0..N {
470             rx.recv().unwrap();
471         }
472     }
473 
474     #[test]
notify_one_return_true()475     fn notify_one_return_true() {
476         let m = Arc::new(Mutex::new(()));
477         let m2 = m.clone();
478         let c = Arc::new(Condvar::new());
479         let c2 = c.clone();
480 
481         let mut g = m.lock();
482         let _t = thread::spawn(move || {
483             let _g = m2.lock();
484             assert!(c2.notify_one());
485         });
486         c.wait(&mut g);
487     }
488 
489     #[test]
notify_one_return_false()490     fn notify_one_return_false() {
491         let m = Arc::new(Mutex::new(()));
492         let c = Arc::new(Condvar::new());
493 
494         let _t = thread::spawn(move || {
495             let _g = m.lock();
496             assert!(!c.notify_one());
497         });
498     }
499 
500     #[test]
notify_all_return()501     fn notify_all_return() {
502         const N: usize = 10;
503 
504         let data = Arc::new((Mutex::new(0), Condvar::new()));
505         let (tx, rx) = channel();
506         for _ in 0..N {
507             let data = data.clone();
508             let tx = tx.clone();
509             thread::spawn(move || {
510                 let &(ref lock, ref cond) = &*data;
511                 let mut cnt = lock.lock();
512                 *cnt += 1;
513                 if *cnt == N {
514                     tx.send(()).unwrap();
515                 }
516                 while *cnt != 0 {
517                     cond.wait(&mut cnt);
518                 }
519                 tx.send(()).unwrap();
520             });
521         }
522         drop(tx);
523 
524         let &(ref lock, ref cond) = &*data;
525         rx.recv().unwrap();
526         let mut cnt = lock.lock();
527         *cnt = 0;
528         assert_eq!(cond.notify_all(), N);
529         drop(cnt);
530 
531         for _ in 0..N {
532             rx.recv().unwrap();
533         }
534 
535         assert_eq!(cond.notify_all(), 0);
536     }
537 
538     #[test]
wait_for()539     fn wait_for() {
540         let m = Arc::new(Mutex::new(()));
541         let m2 = m.clone();
542         let c = Arc::new(Condvar::new());
543         let c2 = c.clone();
544 
545         let mut g = m.lock();
546         let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
547         assert!(no_timeout.timed_out());
548 
549         let _t = thread::spawn(move || {
550             let _g = m2.lock();
551             c2.notify_one();
552         });
553         let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
554         assert!(!timeout_res.timed_out());
555 
556         drop(g);
557     }
558 
559     #[test]
wait_until()560     fn wait_until() {
561         let m = Arc::new(Mutex::new(()));
562         let m2 = m.clone();
563         let c = Arc::new(Condvar::new());
564         let c2 = c.clone();
565 
566         let mut g = m.lock();
567         let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
568         assert!(no_timeout.timed_out());
569         let _t = thread::spawn(move || {
570             let _g = m2.lock();
571             c2.notify_one();
572         });
573         let timeout_res = c.wait_until(
574             &mut g,
575             Instant::now() + Duration::from_millis(u32::max_value() as u64),
576         );
577         assert!(!timeout_res.timed_out());
578         drop(g);
579     }
580 
581     #[test]
582     #[should_panic]
two_mutexes()583     fn two_mutexes() {
584         let m = Arc::new(Mutex::new(()));
585         let m2 = m.clone();
586         let m3 = Arc::new(Mutex::new(()));
587         let c = Arc::new(Condvar::new());
588         let c2 = c.clone();
589 
590         // Make sure we don't leave the child thread dangling
591         struct PanicGuard<'a>(&'a Condvar);
592         impl<'a> Drop for PanicGuard<'a> {
593             fn drop(&mut self) {
594                 self.0.notify_one();
595             }
596         }
597 
598         let (tx, rx) = channel();
599         let g = m.lock();
600         let _t = thread::spawn(move || {
601             let mut g = m2.lock();
602             tx.send(()).unwrap();
603             c2.wait(&mut g);
604         });
605         drop(g);
606         rx.recv().unwrap();
607         let _g = m.lock();
608         let _guard = PanicGuard(&*c);
609         c.wait(&mut m3.lock());
610     }
611 
612     #[test]
two_mutexes_disjoint()613     fn two_mutexes_disjoint() {
614         let m = Arc::new(Mutex::new(()));
615         let m2 = m.clone();
616         let m3 = Arc::new(Mutex::new(()));
617         let c = Arc::new(Condvar::new());
618         let c2 = c.clone();
619 
620         let mut g = m.lock();
621         let _t = thread::spawn(move || {
622             let _g = m2.lock();
623             c2.notify_one();
624         });
625         c.wait(&mut g);
626         drop(g);
627 
628         let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
629     }
630 
631     #[test]
test_debug_condvar()632     fn test_debug_condvar() {
633         let c = Condvar::new();
634         assert_eq!(format!("{:?}", c), "Condvar { .. }");
635     }
636 
637     #[test]
test_condvar_requeue()638     fn test_condvar_requeue() {
639         let m = Arc::new(Mutex::new(()));
640         let m2 = m.clone();
641         let c = Arc::new(Condvar::new());
642         let c2 = c.clone();
643         let t = thread::spawn(move || {
644             let mut g = m2.lock();
645             c2.wait(&mut g);
646         });
647 
648         let mut g = m.lock();
649         while !c.notify_one() {
650             // Wait for the thread to get into wait()
651             MutexGuard::bump(&mut g);
652             // Yield, so the other thread gets a chance to do something.
653             // (At least Miri needs this, because it doesn't preempt threads.)
654             thread::yield_now();
655         }
656         // The thread should have been requeued to the mutex, which we wake up now.
657         drop(g);
658         t.join().unwrap();
659     }
660 
661     #[test]
test_issue_129()662     fn test_issue_129() {
663         let locks = Arc::new((Mutex::new(()), Condvar::new()));
664 
665         let (tx, rx) = channel();
666         for _ in 0..4 {
667             let locks = locks.clone();
668             let tx = tx.clone();
669             thread::spawn(move || {
670                 let mut guard = locks.0.lock();
671                 locks.1.wait(&mut guard);
672                 locks.1.wait_for(&mut guard, Duration::from_millis(1));
673                 locks.1.notify_one();
674                 tx.send(()).unwrap();
675             });
676         }
677 
678         thread::sleep(Duration::from_millis(100));
679         locks.1.notify_one();
680 
681         for _ in 0..4 {
682             assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
683         }
684     }
685 }
686 
687 /// This module contains an integration test that is heavily inspired from WebKit's own integration
688 /// tests for it's own Condvar.
689 #[cfg(test)]
690 mod webkit_queue_test {
691     use crate::{Condvar, Mutex, MutexGuard};
692     use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
693 
694     #[derive(Clone, Copy)]
695     enum Timeout {
696         Bounded(Duration),
697         Forever,
698     }
699 
700     #[derive(Clone, Copy)]
701     enum NotifyStyle {
702         One,
703         All,
704     }
705 
706     struct Queue {
707         items: VecDeque<usize>,
708         should_continue: bool,
709     }
710 
711     impl Queue {
new() -> Self712         fn new() -> Self {
713             Self {
714                 items: VecDeque::new(),
715                 should_continue: true,
716             }
717         }
718     }
719 
wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )720     fn wait<T: ?Sized>(
721         condition: &Condvar,
722         lock: &mut MutexGuard<'_, T>,
723         predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
724         timeout: &Timeout,
725     ) {
726         while !predicate(lock) {
727             match timeout {
728                 Timeout::Forever => condition.wait(lock),
729                 Timeout::Bounded(bound) => {
730                     condition.wait_for(lock, *bound);
731                 }
732             }
733         }
734     }
735 
notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)736     fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
737         match style {
738             NotifyStyle::One => {
739                 condition.notify_one();
740             }
741             NotifyStyle::All => {
742                 if should_notify {
743                     condition.notify_all();
744                 }
745             }
746         }
747     }
748 
run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )749     fn run_queue_test(
750         num_producers: usize,
751         num_consumers: usize,
752         max_queue_size: usize,
753         messages_per_producer: usize,
754         notify_style: NotifyStyle,
755         timeout: Timeout,
756         delay: Duration,
757     ) {
758         let input_queue = Arc::new(Mutex::new(Queue::new()));
759         let empty_condition = Arc::new(Condvar::new());
760         let full_condition = Arc::new(Condvar::new());
761 
762         let output_vec = Arc::new(Mutex::new(vec![]));
763 
764         let consumers = (0..num_consumers)
765             .map(|_| {
766                 consumer_thread(
767                     input_queue.clone(),
768                     empty_condition.clone(),
769                     full_condition.clone(),
770                     timeout,
771                     notify_style,
772                     output_vec.clone(),
773                     max_queue_size,
774                 )
775             })
776             .collect::<Vec<_>>();
777         let producers = (0..num_producers)
778             .map(|_| {
779                 producer_thread(
780                     messages_per_producer,
781                     input_queue.clone(),
782                     empty_condition.clone(),
783                     full_condition.clone(),
784                     timeout,
785                     notify_style,
786                     max_queue_size,
787                 )
788             })
789             .collect::<Vec<_>>();
790 
791         thread::sleep(delay);
792 
793         for producer in producers.into_iter() {
794             producer.join().expect("Producer thread panicked");
795         }
796 
797         {
798             let mut input_queue = input_queue.lock();
799             input_queue.should_continue = false;
800         }
801         empty_condition.notify_all();
802 
803         for consumer in consumers.into_iter() {
804             consumer.join().expect("Consumer thread panicked");
805         }
806 
807         let mut output_vec = output_vec.lock();
808         assert_eq!(output_vec.len(), num_producers * messages_per_producer);
809         output_vec.sort();
810         for msg_idx in 0..messages_per_producer {
811             for producer_idx in 0..num_producers {
812                 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
813             }
814         }
815     }
816 
consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>817     fn consumer_thread(
818         input_queue: Arc<Mutex<Queue>>,
819         empty_condition: Arc<Condvar>,
820         full_condition: Arc<Condvar>,
821         timeout: Timeout,
822         notify_style: NotifyStyle,
823         output_queue: Arc<Mutex<Vec<usize>>>,
824         max_queue_size: usize,
825     ) -> thread::JoinHandle<()> {
826         thread::spawn(move || loop {
827             let (should_notify, result) = {
828                 let mut queue = input_queue.lock();
829                 wait(
830                     &*empty_condition,
831                     &mut queue,
832                     |state| -> bool { !state.items.is_empty() || !state.should_continue },
833                     &timeout,
834                 );
835                 if queue.items.is_empty() && !queue.should_continue {
836                     return;
837                 }
838                 let should_notify = queue.items.len() == max_queue_size;
839                 let result = queue.items.pop_front();
840                 std::mem::drop(queue);
841                 (should_notify, result)
842             };
843             notify(notify_style, &*full_condition, should_notify);
844 
845             if let Some(result) = result {
846                 output_queue.lock().push(result);
847             }
848         })
849     }
850 
producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>851     fn producer_thread(
852         num_messages: usize,
853         queue: Arc<Mutex<Queue>>,
854         empty_condition: Arc<Condvar>,
855         full_condition: Arc<Condvar>,
856         timeout: Timeout,
857         notify_style: NotifyStyle,
858         max_queue_size: usize,
859     ) -> thread::JoinHandle<()> {
860         thread::spawn(move || {
861             for message in 0..num_messages {
862                 let should_notify = {
863                     let mut queue = queue.lock();
864                     wait(
865                         &*full_condition,
866                         &mut queue,
867                         |state| state.items.len() < max_queue_size,
868                         &timeout,
869                     );
870                     let should_notify = queue.items.is_empty();
871                     queue.items.push_back(message);
872                     std::mem::drop(queue);
873                     should_notify
874                 };
875                 notify(notify_style, &*empty_condition, should_notify);
876             }
877         })
878     }
879 
880     macro_rules! run_queue_tests {
881         ( $( $name:ident(
882             num_producers: $num_producers:expr,
883             num_consumers: $num_consumers:expr,
884             max_queue_size: $max_queue_size:expr,
885             messages_per_producer: $messages_per_producer:expr,
886             notification_style: $notification_style:expr,
887             timeout: $timeout:expr,
888             delay_seconds: $delay_seconds:expr);
889         )* ) => {
890             $(#[test]
891             fn $name() {
892                 let delay = Duration::from_secs($delay_seconds);
893                 run_queue_test(
894                     $num_producers,
895                     $num_consumers,
896                     $max_queue_size,
897                     $messages_per_producer,
898                     $notification_style,
899                     $timeout,
900                     delay,
901                     );
902             })*
903         };
904     }
905 
906     run_queue_tests! {
907         sanity_check_queue(
908             num_producers: 1,
909             num_consumers: 1,
910             max_queue_size: 1,
911             messages_per_producer: 100_000,
912             notification_style: NotifyStyle::All,
913             timeout: Timeout::Bounded(Duration::from_secs(1)),
914             delay_seconds: 0
915         );
916         sanity_check_queue_timeout(
917             num_producers: 1,
918             num_consumers: 1,
919             max_queue_size: 1,
920             messages_per_producer: 100_000,
921             notification_style: NotifyStyle::All,
922             timeout: Timeout::Forever,
923             delay_seconds: 0
924         );
925         new_test_without_timeout_5(
926             num_producers: 1,
927             num_consumers: 5,
928             max_queue_size: 1,
929             messages_per_producer: 100_000,
930             notification_style: NotifyStyle::All,
931             timeout: Timeout::Forever,
932             delay_seconds: 0
933         );
934         one_producer_one_consumer_one_slot(
935             num_producers: 1,
936             num_consumers: 1,
937             max_queue_size: 1,
938             messages_per_producer: 100_000,
939             notification_style: NotifyStyle::All,
940             timeout: Timeout::Forever,
941             delay_seconds: 0
942         );
943         one_producer_one_consumer_one_slot_timeout(
944             num_producers: 1,
945             num_consumers: 1,
946             max_queue_size: 1,
947             messages_per_producer: 100_000,
948             notification_style: NotifyStyle::All,
949             timeout: Timeout::Forever,
950             delay_seconds: 1
951         );
952         one_producer_one_consumer_hundred_slots(
953             num_producers: 1,
954             num_consumers: 1,
955             max_queue_size: 100,
956             messages_per_producer: 1_000_000,
957             notification_style: NotifyStyle::All,
958             timeout: Timeout::Forever,
959             delay_seconds: 0
960         );
961         ten_producers_one_consumer_one_slot(
962             num_producers: 10,
963             num_consumers: 1,
964             max_queue_size: 1,
965             messages_per_producer: 10000,
966             notification_style: NotifyStyle::All,
967             timeout: Timeout::Forever,
968             delay_seconds: 0
969         );
970         ten_producers_one_consumer_hundred_slots_notify_all(
971             num_producers: 10,
972             num_consumers: 1,
973             max_queue_size: 100,
974             messages_per_producer: 10000,
975             notification_style: NotifyStyle::All,
976             timeout: Timeout::Forever,
977             delay_seconds: 0
978         );
979         ten_producers_one_consumer_hundred_slots_notify_one(
980             num_producers: 10,
981             num_consumers: 1,
982             max_queue_size: 100,
983             messages_per_producer: 10000,
984             notification_style: NotifyStyle::One,
985             timeout: Timeout::Forever,
986             delay_seconds: 0
987         );
988         one_producer_ten_consumers_one_slot(
989             num_producers: 1,
990             num_consumers: 10,
991             max_queue_size: 1,
992             messages_per_producer: 10000,
993             notification_style: NotifyStyle::All,
994             timeout: Timeout::Forever,
995             delay_seconds: 0
996         );
997         one_producer_ten_consumers_hundred_slots_notify_all(
998             num_producers: 1,
999             num_consumers: 10,
1000             max_queue_size: 100,
1001             messages_per_producer: 100_000,
1002             notification_style: NotifyStyle::All,
1003             timeout: Timeout::Forever,
1004             delay_seconds: 0
1005         );
1006         one_producer_ten_consumers_hundred_slots_notify_one(
1007             num_producers: 1,
1008             num_consumers: 10,
1009             max_queue_size: 100,
1010             messages_per_producer: 100_000,
1011             notification_style: NotifyStyle::One,
1012             timeout: Timeout::Forever,
1013             delay_seconds: 0
1014         );
1015         ten_producers_ten_consumers_one_slot(
1016             num_producers: 10,
1017             num_consumers: 10,
1018             max_queue_size: 1,
1019             messages_per_producer: 50000,
1020             notification_style: NotifyStyle::All,
1021             timeout: Timeout::Forever,
1022             delay_seconds: 0
1023         );
1024         ten_producers_ten_consumers_hundred_slots_notify_all(
1025             num_producers: 10,
1026             num_consumers: 10,
1027             max_queue_size: 100,
1028             messages_per_producer: 50000,
1029             notification_style: NotifyStyle::All,
1030             timeout: Timeout::Forever,
1031             delay_seconds: 0
1032         );
1033         ten_producers_ten_consumers_hundred_slots_notify_one(
1034             num_producers: 10,
1035             num_consumers: 10,
1036             max_queue_size: 100,
1037             messages_per_producer: 50000,
1038             notification_style: NotifyStyle::One,
1039             timeout: Timeout::Forever,
1040             delay_seconds: 0
1041         );
1042     }
1043 }
1044