1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::mutex::MutexGuard; 9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::{deadlock, util}; 11 use core::{ 12 fmt, ptr, 13 sync::atomic::{AtomicPtr, Ordering}, 14 }; 15 use lock_api::RawMutex as RawMutex_; 16 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; 17 use std::time::{Duration, Instant}; 18 19 /// A type indicating whether a timed wait on a condition variable returned 20 /// due to a time out or not. 21 #[derive(Debug, PartialEq, Eq, Copy, Clone)] 22 pub struct WaitTimeoutResult(bool); 23 24 impl WaitTimeoutResult { 25 /// Returns whether the wait was known to have timed out. 26 #[inline] timed_out(self) -> bool27 pub fn timed_out(self) -> bool { 28 self.0 29 } 30 } 31 32 /// A Condition Variable 33 /// 34 /// Condition variables represent the ability to block a thread such that it 35 /// consumes no CPU time while waiting for an event to occur. Condition 36 /// variables are typically associated with a boolean predicate (a condition) 37 /// and a mutex. The predicate is always verified inside of the mutex before 38 /// determining that thread must block. 39 /// 40 /// Note that this module places one additional restriction over the system 41 /// condition variables: each condvar can be used with only one mutex at a 42 /// time. Any attempt to use multiple mutexes on the same condition variable 43 /// simultaneously will result in a runtime panic. However it is possible to 44 /// switch to a different mutex if there are no threads currently waiting on 45 /// the condition variable. 46 /// 47 /// # Differences from the standard library `Condvar` 48 /// 49 /// - No spurious wakeups: A wait will only return a non-timeout result if it 50 /// was woken up by `notify_one` or `notify_all`. 51 /// - `Condvar::notify_all` will only wake up a single thread, the rest are 52 /// requeued to wait for the `Mutex` to be unlocked by the thread that was 53 /// woken up. 54 /// - Only requires 1 word of space, whereas the standard library boxes the 55 /// `Condvar` due to platform limitations. 56 /// - Can be statically constructed (requires the `const_fn` nightly feature). 57 /// - Does not require any drop glue when dropped. 58 /// - Inline fast path for the uncontended case. 59 /// 60 /// # Examples 61 /// 62 /// ``` 63 /// use parking_lot::{Mutex, Condvar}; 64 /// use std::sync::Arc; 65 /// use std::thread; 66 /// 67 /// let pair = Arc::new((Mutex::new(false), Condvar::new())); 68 /// let pair2 = pair.clone(); 69 /// 70 /// // Inside of our lock, spawn a new thread, and then wait for it to start 71 /// thread::spawn(move|| { 72 /// let &(ref lock, ref cvar) = &*pair2; 73 /// let mut started = lock.lock(); 74 /// *started = true; 75 /// cvar.notify_one(); 76 /// }); 77 /// 78 /// // wait for the thread to start up 79 /// let &(ref lock, ref cvar) = &*pair; 80 /// let mut started = lock.lock(); 81 /// if !*started { 82 /// cvar.wait(&mut started); 83 /// } 84 /// // Note that we used an if instead of a while loop above. This is only 85 /// // possible because parking_lot's Condvar will never spuriously wake up. 86 /// // This means that wait() will only return after notify_one or notify_all is 87 /// // called. 88 /// ``` 89 pub struct Condvar { 90 state: AtomicPtr<RawMutex>, 91 } 92 93 impl Condvar { 94 /// Creates a new condition variable which is ready to be waited on and 95 /// notified. 96 #[inline] new() -> Condvar97 pub const fn new() -> Condvar { 98 Condvar { 99 state: AtomicPtr::new(ptr::null_mut()), 100 } 101 } 102 103 /// Wakes up one blocked thread on this condvar. 104 /// 105 /// Returns whether a thread was woken up. 106 /// 107 /// If there is a blocked thread on this condition variable, then it will 108 /// be woken up from its call to `wait` or `wait_timeout`. Calls to 109 /// `notify_one` are not buffered in any way. 110 /// 111 /// To wake up all threads, see `notify_all()`. 112 /// 113 /// # Examples 114 /// 115 /// ``` 116 /// use parking_lot::Condvar; 117 /// 118 /// let condvar = Condvar::new(); 119 /// 120 /// // do something with condvar, share it with other threads 121 /// 122 /// if !condvar.notify_one() { 123 /// println!("Nobody was listening for this."); 124 /// } 125 /// ``` 126 #[inline] notify_one(&self) -> bool127 pub fn notify_one(&self) -> bool { 128 // Nothing to do if there are no waiting threads 129 let state = self.state.load(Ordering::Relaxed); 130 if state.is_null() { 131 return false; 132 } 133 134 self.notify_one_slow(state) 135 } 136 137 #[cold] notify_one_slow(&self, mutex: *mut RawMutex) -> bool138 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool { 139 unsafe { 140 // Unpark one thread and requeue the rest onto the mutex 141 let from = self as *const _ as usize; 142 let to = mutex as usize; 143 let validate = || { 144 // Make sure that our atomic state still points to the same 145 // mutex. If not then it means that all threads on the current 146 // mutex were woken up and a new waiting thread switched to a 147 // different mutex. In that case we can get away with doing 148 // nothing. 149 if self.state.load(Ordering::Relaxed) != mutex { 150 return RequeueOp::Abort; 151 } 152 153 // Unpark one thread if the mutex is unlocked, otherwise just 154 // requeue everything to the mutex. This is safe to do here 155 // since unlocking the mutex when the parked bit is set requires 156 // locking the queue. There is the possibility of a race if the 157 // mutex gets locked after we check, but that doesn't matter in 158 // this case. 159 if (*mutex).mark_parked_if_locked() { 160 RequeueOp::RequeueOne 161 } else { 162 RequeueOp::UnparkOne 163 } 164 }; 165 let callback = |_op, result: UnparkResult| { 166 // Clear our state if there are no more waiting threads 167 if !result.have_more_threads { 168 self.state.store(ptr::null_mut(), Ordering::Relaxed); 169 } 170 TOKEN_NORMAL 171 }; 172 let res = parking_lot_core::unpark_requeue(from, to, validate, callback); 173 174 res.unparked_threads + res.requeued_threads != 0 175 } 176 } 177 178 /// Wakes up all blocked threads on this condvar. 179 /// 180 /// Returns the number of threads woken up. 181 /// 182 /// This method will ensure that any current waiters on the condition 183 /// variable are awoken. Calls to `notify_all()` are not buffered in any 184 /// way. 185 /// 186 /// To wake up only one thread, see `notify_one()`. 187 #[inline] notify_all(&self) -> usize188 pub fn notify_all(&self) -> usize { 189 // Nothing to do if there are no waiting threads 190 let state = self.state.load(Ordering::Relaxed); 191 if state.is_null() { 192 return 0; 193 } 194 195 self.notify_all_slow(state) 196 } 197 198 #[cold] notify_all_slow(&self, mutex: *mut RawMutex) -> usize199 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize { 200 unsafe { 201 // Unpark one thread and requeue the rest onto the mutex 202 let from = self as *const _ as usize; 203 let to = mutex as usize; 204 let validate = || { 205 // Make sure that our atomic state still points to the same 206 // mutex. If not then it means that all threads on the current 207 // mutex were woken up and a new waiting thread switched to a 208 // different mutex. In that case we can get away with doing 209 // nothing. 210 if self.state.load(Ordering::Relaxed) != mutex { 211 return RequeueOp::Abort; 212 } 213 214 // Clear our state since we are going to unpark or requeue all 215 // threads. 216 self.state.store(ptr::null_mut(), Ordering::Relaxed); 217 218 // Unpark one thread if the mutex is unlocked, otherwise just 219 // requeue everything to the mutex. This is safe to do here 220 // since unlocking the mutex when the parked bit is set requires 221 // locking the queue. There is the possibility of a race if the 222 // mutex gets locked after we check, but that doesn't matter in 223 // this case. 224 if (*mutex).mark_parked_if_locked() { 225 RequeueOp::RequeueAll 226 } else { 227 RequeueOp::UnparkOneRequeueRest 228 } 229 }; 230 let callback = |op, result: UnparkResult| { 231 // If we requeued threads to the mutex, mark it as having 232 // parked threads. The RequeueAll case is already handled above. 233 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { 234 (*mutex).mark_parked(); 235 } 236 TOKEN_NORMAL 237 }; 238 let res = parking_lot_core::unpark_requeue(from, to, validate, callback); 239 240 res.unparked_threads + res.requeued_threads 241 } 242 } 243 244 /// Blocks the current thread until this condition variable receives a 245 /// notification. 246 /// 247 /// This function will atomically unlock the mutex specified (represented by 248 /// `mutex_guard`) and block the current thread. This means that any calls 249 /// to `notify_*()` which happen logically after the mutex is unlocked are 250 /// candidates to wake this thread up. When this function call returns, the 251 /// lock specified will have been re-acquired. 252 /// 253 /// # Panics 254 /// 255 /// This function will panic if another thread is waiting on the `Condvar` 256 /// with a different `Mutex` object. 257 #[inline] wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)258 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) { 259 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None); 260 } 261 262 /// Waits on this condition variable for a notification, timing out after 263 /// the specified time instant. 264 /// 265 /// The semantics of this function are equivalent to `wait()` except that 266 /// the thread will be blocked roughly until `timeout` is reached. This 267 /// method should not be used for precise timing due to anomalies such as 268 /// preemption or platform differences that may not cause the maximum 269 /// amount of time waited to be precisely `timeout`. 270 /// 271 /// Note that the best effort is made to ensure that the time waited is 272 /// measured with a monotonic clock, and not affected by the changes made to 273 /// the system time. 274 /// 275 /// The returned `WaitTimeoutResult` value indicates if the timeout is 276 /// known to have elapsed. 277 /// 278 /// Like `wait`, the lock specified will be re-acquired when this function 279 /// returns, regardless of whether the timeout elapsed or not. 280 /// 281 /// # Panics 282 /// 283 /// This function will panic if another thread is waiting on the `Condvar` 284 /// with a different `Mutex` object. 285 #[inline] wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult286 pub fn wait_until<T: ?Sized>( 287 &self, 288 mutex_guard: &mut MutexGuard<'_, T>, 289 timeout: Instant, 290 ) -> WaitTimeoutResult { 291 self.wait_until_internal( 292 unsafe { MutexGuard::mutex(mutex_guard).raw() }, 293 Some(timeout), 294 ) 295 } 296 297 // This is a non-generic function to reduce the monomorphization cost of 298 // using `wait_until`. wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult299 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult { 300 unsafe { 301 let result; 302 let mut bad_mutex = false; 303 let mut requeued = false; 304 { 305 let addr = self as *const _ as usize; 306 let lock_addr = mutex as *const _ as *mut _; 307 let validate = || { 308 // Ensure we don't use two different mutexes with the same 309 // Condvar at the same time. This is done while locked to 310 // avoid races with notify_one 311 let state = self.state.load(Ordering::Relaxed); 312 if state.is_null() { 313 self.state.store(lock_addr, Ordering::Relaxed); 314 } else if state != lock_addr { 315 bad_mutex = true; 316 return false; 317 } 318 true 319 }; 320 let before_sleep = || { 321 // Unlock the mutex before sleeping... 322 mutex.unlock(); 323 }; 324 let timed_out = |k, was_last_thread| { 325 // If we were requeued to a mutex, then we did not time out. 326 // We'll just park ourselves on the mutex again when we try 327 // to lock it later. 328 requeued = k != addr; 329 330 // If we were the last thread on the queue then we need to 331 // clear our state. This is normally done by the 332 // notify_{one,all} functions when not timing out. 333 if !requeued && was_last_thread { 334 self.state.store(ptr::null_mut(), Ordering::Relaxed); 335 } 336 }; 337 result = parking_lot_core::park( 338 addr, 339 validate, 340 before_sleep, 341 timed_out, 342 DEFAULT_PARK_TOKEN, 343 timeout, 344 ); 345 } 346 347 // Panic if we tried to use multiple mutexes with a Condvar. Note 348 // that at this point the MutexGuard is still locked. It will be 349 // unlocked by the unwinding logic. 350 if bad_mutex { 351 panic!("attempted to use a condition variable with more than one mutex"); 352 } 353 354 // ... and re-lock it once we are done sleeping 355 if result == ParkResult::Unparked(TOKEN_HANDOFF) { 356 deadlock::acquire_resource(mutex as *const _ as usize); 357 } else { 358 mutex.lock(); 359 } 360 361 WaitTimeoutResult(!(result.is_unparked() || requeued)) 362 } 363 } 364 365 /// Waits on this condition variable for a notification, timing out after a 366 /// specified duration. 367 /// 368 /// The semantics of this function are equivalent to `wait()` except that 369 /// the thread will be blocked for roughly no longer than `timeout`. This 370 /// method should not be used for precise timing due to anomalies such as 371 /// preemption or platform differences that may not cause the maximum 372 /// amount of time waited to be precisely `timeout`. 373 /// 374 /// Note that the best effort is made to ensure that the time waited is 375 /// measured with a monotonic clock, and not affected by the changes made to 376 /// the system time. 377 /// 378 /// The returned `WaitTimeoutResult` value indicates if the timeout is 379 /// known to have elapsed. 380 /// 381 /// Like `wait`, the lock specified will be re-acquired when this function 382 /// returns, regardless of whether the timeout elapsed or not. 383 #[inline] wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult384 pub fn wait_for<T: ?Sized>( 385 &self, 386 mutex_guard: &mut MutexGuard<'_, T>, 387 timeout: Duration, 388 ) -> WaitTimeoutResult { 389 let deadline = util::to_deadline(timeout); 390 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) 391 } 392 } 393 394 impl Default for Condvar { 395 #[inline] default() -> Condvar396 fn default() -> Condvar { 397 Condvar::new() 398 } 399 } 400 401 impl fmt::Debug for Condvar { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 403 f.pad("Condvar { .. }") 404 } 405 } 406 407 #[cfg(test)] 408 mod tests { 409 use crate::{Condvar, Mutex, MutexGuard}; 410 use std::sync::mpsc::channel; 411 use std::sync::Arc; 412 use std::thread; 413 use std::time::Duration; 414 use std::time::Instant; 415 416 #[test] smoke()417 fn smoke() { 418 let c = Condvar::new(); 419 c.notify_one(); 420 c.notify_all(); 421 } 422 423 #[test] notify_one()424 fn notify_one() { 425 let m = Arc::new(Mutex::new(())); 426 let m2 = m.clone(); 427 let c = Arc::new(Condvar::new()); 428 let c2 = c.clone(); 429 430 let mut g = m.lock(); 431 let _t = thread::spawn(move || { 432 let _g = m2.lock(); 433 c2.notify_one(); 434 }); 435 c.wait(&mut g); 436 } 437 438 #[test] notify_all()439 fn notify_all() { 440 const N: usize = 10; 441 442 let data = Arc::new((Mutex::new(0), Condvar::new())); 443 let (tx, rx) = channel(); 444 for _ in 0..N { 445 let data = data.clone(); 446 let tx = tx.clone(); 447 thread::spawn(move || { 448 let &(ref lock, ref cond) = &*data; 449 let mut cnt = lock.lock(); 450 *cnt += 1; 451 if *cnt == N { 452 tx.send(()).unwrap(); 453 } 454 while *cnt != 0 { 455 cond.wait(&mut cnt); 456 } 457 tx.send(()).unwrap(); 458 }); 459 } 460 drop(tx); 461 462 let &(ref lock, ref cond) = &*data; 463 rx.recv().unwrap(); 464 let mut cnt = lock.lock(); 465 *cnt = 0; 466 cond.notify_all(); 467 drop(cnt); 468 469 for _ in 0..N { 470 rx.recv().unwrap(); 471 } 472 } 473 474 #[test] notify_one_return_true()475 fn notify_one_return_true() { 476 let m = Arc::new(Mutex::new(())); 477 let m2 = m.clone(); 478 let c = Arc::new(Condvar::new()); 479 let c2 = c.clone(); 480 481 let mut g = m.lock(); 482 let _t = thread::spawn(move || { 483 let _g = m2.lock(); 484 assert!(c2.notify_one()); 485 }); 486 c.wait(&mut g); 487 } 488 489 #[test] notify_one_return_false()490 fn notify_one_return_false() { 491 let m = Arc::new(Mutex::new(())); 492 let c = Arc::new(Condvar::new()); 493 494 let _t = thread::spawn(move || { 495 let _g = m.lock(); 496 assert!(!c.notify_one()); 497 }); 498 } 499 500 #[test] notify_all_return()501 fn notify_all_return() { 502 const N: usize = 10; 503 504 let data = Arc::new((Mutex::new(0), Condvar::new())); 505 let (tx, rx) = channel(); 506 for _ in 0..N { 507 let data = data.clone(); 508 let tx = tx.clone(); 509 thread::spawn(move || { 510 let &(ref lock, ref cond) = &*data; 511 let mut cnt = lock.lock(); 512 *cnt += 1; 513 if *cnt == N { 514 tx.send(()).unwrap(); 515 } 516 while *cnt != 0 { 517 cond.wait(&mut cnt); 518 } 519 tx.send(()).unwrap(); 520 }); 521 } 522 drop(tx); 523 524 let &(ref lock, ref cond) = &*data; 525 rx.recv().unwrap(); 526 let mut cnt = lock.lock(); 527 *cnt = 0; 528 assert_eq!(cond.notify_all(), N); 529 drop(cnt); 530 531 for _ in 0..N { 532 rx.recv().unwrap(); 533 } 534 535 assert_eq!(cond.notify_all(), 0); 536 } 537 538 #[test] wait_for()539 fn wait_for() { 540 let m = Arc::new(Mutex::new(())); 541 let m2 = m.clone(); 542 let c = Arc::new(Condvar::new()); 543 let c2 = c.clone(); 544 545 let mut g = m.lock(); 546 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1)); 547 assert!(no_timeout.timed_out()); 548 549 let _t = thread::spawn(move || { 550 let _g = m2.lock(); 551 c2.notify_one(); 552 }); 553 let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value())); 554 assert!(!timeout_res.timed_out()); 555 556 drop(g); 557 } 558 559 #[test] wait_until()560 fn wait_until() { 561 let m = Arc::new(Mutex::new(())); 562 let m2 = m.clone(); 563 let c = Arc::new(Condvar::new()); 564 let c2 = c.clone(); 565 566 let mut g = m.lock(); 567 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); 568 assert!(no_timeout.timed_out()); 569 let _t = thread::spawn(move || { 570 let _g = m2.lock(); 571 c2.notify_one(); 572 }); 573 let timeout_res = c.wait_until( 574 &mut g, 575 Instant::now() + Duration::from_millis(u32::max_value() as u64), 576 ); 577 assert!(!timeout_res.timed_out()); 578 drop(g); 579 } 580 581 #[test] 582 #[should_panic] two_mutexes()583 fn two_mutexes() { 584 let m = Arc::new(Mutex::new(())); 585 let m2 = m.clone(); 586 let m3 = Arc::new(Mutex::new(())); 587 let c = Arc::new(Condvar::new()); 588 let c2 = c.clone(); 589 590 // Make sure we don't leave the child thread dangling 591 struct PanicGuard<'a>(&'a Condvar); 592 impl<'a> Drop for PanicGuard<'a> { 593 fn drop(&mut self) { 594 self.0.notify_one(); 595 } 596 } 597 598 let (tx, rx) = channel(); 599 let g = m.lock(); 600 let _t = thread::spawn(move || { 601 let mut g = m2.lock(); 602 tx.send(()).unwrap(); 603 c2.wait(&mut g); 604 }); 605 drop(g); 606 rx.recv().unwrap(); 607 let _g = m.lock(); 608 let _guard = PanicGuard(&*c); 609 c.wait(&mut m3.lock()); 610 } 611 612 #[test] two_mutexes_disjoint()613 fn two_mutexes_disjoint() { 614 let m = Arc::new(Mutex::new(())); 615 let m2 = m.clone(); 616 let m3 = Arc::new(Mutex::new(())); 617 let c = Arc::new(Condvar::new()); 618 let c2 = c.clone(); 619 620 let mut g = m.lock(); 621 let _t = thread::spawn(move || { 622 let _g = m2.lock(); 623 c2.notify_one(); 624 }); 625 c.wait(&mut g); 626 drop(g); 627 628 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1)); 629 } 630 631 #[test] test_debug_condvar()632 fn test_debug_condvar() { 633 let c = Condvar::new(); 634 assert_eq!(format!("{:?}", c), "Condvar { .. }"); 635 } 636 637 #[test] test_condvar_requeue()638 fn test_condvar_requeue() { 639 let m = Arc::new(Mutex::new(())); 640 let m2 = m.clone(); 641 let c = Arc::new(Condvar::new()); 642 let c2 = c.clone(); 643 let t = thread::spawn(move || { 644 let mut g = m2.lock(); 645 c2.wait(&mut g); 646 }); 647 648 let mut g = m.lock(); 649 while !c.notify_one() { 650 // Wait for the thread to get into wait() 651 MutexGuard::bump(&mut g); 652 // Yield, so the other thread gets a chance to do something. 653 // (At least Miri needs this, because it doesn't preempt threads.) 654 thread::yield_now(); 655 } 656 // The thread should have been requeued to the mutex, which we wake up now. 657 drop(g); 658 t.join().unwrap(); 659 } 660 661 #[test] test_issue_129()662 fn test_issue_129() { 663 let locks = Arc::new((Mutex::new(()), Condvar::new())); 664 665 let (tx, rx) = channel(); 666 for _ in 0..4 { 667 let locks = locks.clone(); 668 let tx = tx.clone(); 669 thread::spawn(move || { 670 let mut guard = locks.0.lock(); 671 locks.1.wait(&mut guard); 672 locks.1.wait_for(&mut guard, Duration::from_millis(1)); 673 locks.1.notify_one(); 674 tx.send(()).unwrap(); 675 }); 676 } 677 678 thread::sleep(Duration::from_millis(100)); 679 locks.1.notify_one(); 680 681 for _ in 0..4 { 682 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(())); 683 } 684 } 685 } 686 687 /// This module contains an integration test that is heavily inspired from WebKit's own integration 688 /// tests for it's own Condvar. 689 #[cfg(test)] 690 mod webkit_queue_test { 691 use crate::{Condvar, Mutex, MutexGuard}; 692 use std::{collections::VecDeque, sync::Arc, thread, time::Duration}; 693 694 #[derive(Clone, Copy)] 695 enum Timeout { 696 Bounded(Duration), 697 Forever, 698 } 699 700 #[derive(Clone, Copy)] 701 enum NotifyStyle { 702 One, 703 All, 704 } 705 706 struct Queue { 707 items: VecDeque<usize>, 708 should_continue: bool, 709 } 710 711 impl Queue { new() -> Self712 fn new() -> Self { 713 Self { 714 items: VecDeque::new(), 715 should_continue: true, 716 } 717 } 718 } 719 wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )720 fn wait<T: ?Sized>( 721 condition: &Condvar, 722 lock: &mut MutexGuard<'_, T>, 723 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, 724 timeout: &Timeout, 725 ) { 726 while !predicate(lock) { 727 match timeout { 728 Timeout::Forever => condition.wait(lock), 729 Timeout::Bounded(bound) => { 730 condition.wait_for(lock, *bound); 731 } 732 } 733 } 734 } 735 notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)736 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) { 737 match style { 738 NotifyStyle::One => { 739 condition.notify_one(); 740 } 741 NotifyStyle::All => { 742 if should_notify { 743 condition.notify_all(); 744 } 745 } 746 } 747 } 748 run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )749 fn run_queue_test( 750 num_producers: usize, 751 num_consumers: usize, 752 max_queue_size: usize, 753 messages_per_producer: usize, 754 notify_style: NotifyStyle, 755 timeout: Timeout, 756 delay: Duration, 757 ) { 758 let input_queue = Arc::new(Mutex::new(Queue::new())); 759 let empty_condition = Arc::new(Condvar::new()); 760 let full_condition = Arc::new(Condvar::new()); 761 762 let output_vec = Arc::new(Mutex::new(vec![])); 763 764 let consumers = (0..num_consumers) 765 .map(|_| { 766 consumer_thread( 767 input_queue.clone(), 768 empty_condition.clone(), 769 full_condition.clone(), 770 timeout, 771 notify_style, 772 output_vec.clone(), 773 max_queue_size, 774 ) 775 }) 776 .collect::<Vec<_>>(); 777 let producers = (0..num_producers) 778 .map(|_| { 779 producer_thread( 780 messages_per_producer, 781 input_queue.clone(), 782 empty_condition.clone(), 783 full_condition.clone(), 784 timeout, 785 notify_style, 786 max_queue_size, 787 ) 788 }) 789 .collect::<Vec<_>>(); 790 791 thread::sleep(delay); 792 793 for producer in producers.into_iter() { 794 producer.join().expect("Producer thread panicked"); 795 } 796 797 { 798 let mut input_queue = input_queue.lock(); 799 input_queue.should_continue = false; 800 } 801 empty_condition.notify_all(); 802 803 for consumer in consumers.into_iter() { 804 consumer.join().expect("Consumer thread panicked"); 805 } 806 807 let mut output_vec = output_vec.lock(); 808 assert_eq!(output_vec.len(), num_producers * messages_per_producer); 809 output_vec.sort(); 810 for msg_idx in 0..messages_per_producer { 811 for producer_idx in 0..num_producers { 812 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]); 813 } 814 } 815 } 816 consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>817 fn consumer_thread( 818 input_queue: Arc<Mutex<Queue>>, 819 empty_condition: Arc<Condvar>, 820 full_condition: Arc<Condvar>, 821 timeout: Timeout, 822 notify_style: NotifyStyle, 823 output_queue: Arc<Mutex<Vec<usize>>>, 824 max_queue_size: usize, 825 ) -> thread::JoinHandle<()> { 826 thread::spawn(move || loop { 827 let (should_notify, result) = { 828 let mut queue = input_queue.lock(); 829 wait( 830 &*empty_condition, 831 &mut queue, 832 |state| -> bool { !state.items.is_empty() || !state.should_continue }, 833 &timeout, 834 ); 835 if queue.items.is_empty() && !queue.should_continue { 836 return; 837 } 838 let should_notify = queue.items.len() == max_queue_size; 839 let result = queue.items.pop_front(); 840 std::mem::drop(queue); 841 (should_notify, result) 842 }; 843 notify(notify_style, &*full_condition, should_notify); 844 845 if let Some(result) = result { 846 output_queue.lock().push(result); 847 } 848 }) 849 } 850 producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>851 fn producer_thread( 852 num_messages: usize, 853 queue: Arc<Mutex<Queue>>, 854 empty_condition: Arc<Condvar>, 855 full_condition: Arc<Condvar>, 856 timeout: Timeout, 857 notify_style: NotifyStyle, 858 max_queue_size: usize, 859 ) -> thread::JoinHandle<()> { 860 thread::spawn(move || { 861 for message in 0..num_messages { 862 let should_notify = { 863 let mut queue = queue.lock(); 864 wait( 865 &*full_condition, 866 &mut queue, 867 |state| state.items.len() < max_queue_size, 868 &timeout, 869 ); 870 let should_notify = queue.items.is_empty(); 871 queue.items.push_back(message); 872 std::mem::drop(queue); 873 should_notify 874 }; 875 notify(notify_style, &*empty_condition, should_notify); 876 } 877 }) 878 } 879 880 macro_rules! run_queue_tests { 881 ( $( $name:ident( 882 num_producers: $num_producers:expr, 883 num_consumers: $num_consumers:expr, 884 max_queue_size: $max_queue_size:expr, 885 messages_per_producer: $messages_per_producer:expr, 886 notification_style: $notification_style:expr, 887 timeout: $timeout:expr, 888 delay_seconds: $delay_seconds:expr); 889 )* ) => { 890 $(#[test] 891 fn $name() { 892 let delay = Duration::from_secs($delay_seconds); 893 run_queue_test( 894 $num_producers, 895 $num_consumers, 896 $max_queue_size, 897 $messages_per_producer, 898 $notification_style, 899 $timeout, 900 delay, 901 ); 902 })* 903 }; 904 } 905 906 run_queue_tests! { 907 sanity_check_queue( 908 num_producers: 1, 909 num_consumers: 1, 910 max_queue_size: 1, 911 messages_per_producer: 100_000, 912 notification_style: NotifyStyle::All, 913 timeout: Timeout::Bounded(Duration::from_secs(1)), 914 delay_seconds: 0 915 ); 916 sanity_check_queue_timeout( 917 num_producers: 1, 918 num_consumers: 1, 919 max_queue_size: 1, 920 messages_per_producer: 100_000, 921 notification_style: NotifyStyle::All, 922 timeout: Timeout::Forever, 923 delay_seconds: 0 924 ); 925 new_test_without_timeout_5( 926 num_producers: 1, 927 num_consumers: 5, 928 max_queue_size: 1, 929 messages_per_producer: 100_000, 930 notification_style: NotifyStyle::All, 931 timeout: Timeout::Forever, 932 delay_seconds: 0 933 ); 934 one_producer_one_consumer_one_slot( 935 num_producers: 1, 936 num_consumers: 1, 937 max_queue_size: 1, 938 messages_per_producer: 100_000, 939 notification_style: NotifyStyle::All, 940 timeout: Timeout::Forever, 941 delay_seconds: 0 942 ); 943 one_producer_one_consumer_one_slot_timeout( 944 num_producers: 1, 945 num_consumers: 1, 946 max_queue_size: 1, 947 messages_per_producer: 100_000, 948 notification_style: NotifyStyle::All, 949 timeout: Timeout::Forever, 950 delay_seconds: 1 951 ); 952 one_producer_one_consumer_hundred_slots( 953 num_producers: 1, 954 num_consumers: 1, 955 max_queue_size: 100, 956 messages_per_producer: 1_000_000, 957 notification_style: NotifyStyle::All, 958 timeout: Timeout::Forever, 959 delay_seconds: 0 960 ); 961 ten_producers_one_consumer_one_slot( 962 num_producers: 10, 963 num_consumers: 1, 964 max_queue_size: 1, 965 messages_per_producer: 10000, 966 notification_style: NotifyStyle::All, 967 timeout: Timeout::Forever, 968 delay_seconds: 0 969 ); 970 ten_producers_one_consumer_hundred_slots_notify_all( 971 num_producers: 10, 972 num_consumers: 1, 973 max_queue_size: 100, 974 messages_per_producer: 10000, 975 notification_style: NotifyStyle::All, 976 timeout: Timeout::Forever, 977 delay_seconds: 0 978 ); 979 ten_producers_one_consumer_hundred_slots_notify_one( 980 num_producers: 10, 981 num_consumers: 1, 982 max_queue_size: 100, 983 messages_per_producer: 10000, 984 notification_style: NotifyStyle::One, 985 timeout: Timeout::Forever, 986 delay_seconds: 0 987 ); 988 one_producer_ten_consumers_one_slot( 989 num_producers: 1, 990 num_consumers: 10, 991 max_queue_size: 1, 992 messages_per_producer: 10000, 993 notification_style: NotifyStyle::All, 994 timeout: Timeout::Forever, 995 delay_seconds: 0 996 ); 997 one_producer_ten_consumers_hundred_slots_notify_all( 998 num_producers: 1, 999 num_consumers: 10, 1000 max_queue_size: 100, 1001 messages_per_producer: 100_000, 1002 notification_style: NotifyStyle::All, 1003 timeout: Timeout::Forever, 1004 delay_seconds: 0 1005 ); 1006 one_producer_ten_consumers_hundred_slots_notify_one( 1007 num_producers: 1, 1008 num_consumers: 10, 1009 max_queue_size: 100, 1010 messages_per_producer: 100_000, 1011 notification_style: NotifyStyle::One, 1012 timeout: Timeout::Forever, 1013 delay_seconds: 0 1014 ); 1015 ten_producers_ten_consumers_one_slot( 1016 num_producers: 10, 1017 num_consumers: 10, 1018 max_queue_size: 1, 1019 messages_per_producer: 50000, 1020 notification_style: NotifyStyle::All, 1021 timeout: Timeout::Forever, 1022 delay_seconds: 0 1023 ); 1024 ten_producers_ten_consumers_hundred_slots_notify_all( 1025 num_producers: 10, 1026 num_consumers: 10, 1027 max_queue_size: 100, 1028 messages_per_producer: 50000, 1029 notification_style: NotifyStyle::All, 1030 timeout: Timeout::Forever, 1031 delay_seconds: 0 1032 ); 1033 ten_producers_ten_consumers_hundred_slots_notify_one( 1034 num_producers: 10, 1035 num_consumers: 10, 1036 max_queue_size: 100, 1037 messages_per_producer: 50000, 1038 notification_style: NotifyStyle::One, 1039 timeout: Timeout::Forever, 1040 delay_seconds: 0 1041 ); 1042 } 1043 } 1044