1 //! Implements threads. 2 3 use std::cell::RefCell; 4 use std::collections::hash_map::Entry; 5 use std::num::TryFromIntError; 6 use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; 7 use std::task::Poll; 8 use std::time::{Duration, SystemTime}; 9 10 use log::trace; 11 12 use rustc_data_structures::fx::FxHashMap; 13 use rustc_hir::def_id::DefId; 14 use rustc_index::{Idx, IndexVec}; 15 use rustc_middle::mir::Mutability; 16 use rustc_middle::ty::layout::TyAndLayout; 17 use rustc_span::Span; 18 use rustc_target::spec::abi::Abi; 19 20 use crate::concurrency::data_race; 21 use crate::concurrency::sync::SynchronizationState; 22 use crate::shims::tls; 23 use crate::*; 24 25 #[derive(Clone, Copy, Debug, PartialEq, Eq)] 26 enum SchedulingAction { 27 /// Execute step on the active thread. 28 ExecuteStep, 29 /// Execute a timeout callback. 30 ExecuteTimeoutCallback, 31 /// Wait for a bit, until there is a timeout to be called. 32 Sleep(Duration), 33 } 34 35 /// Trait for callbacks that can be executed when some event happens, such as after a timeout. 36 pub trait MachineCallback<'mir, 'tcx>: VisitTags { call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>37 fn call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>; 38 } 39 40 type TimeoutCallback<'mir, 'tcx> = Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>; 41 42 /// A thread identifier. 43 #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] 44 pub struct ThreadId(u32); 45 46 impl ThreadId { to_u32(self) -> u3247 pub fn to_u32(self) -> u32 { 48 self.0 49 } 50 } 51 52 impl Idx for ThreadId { new(idx: usize) -> Self53 fn new(idx: usize) -> Self { 54 ThreadId(u32::try_from(idx).unwrap()) 55 } 56 index(self) -> usize57 fn index(self) -> usize { 58 usize::try_from(self.0).unwrap() 59 } 60 } 61 62 impl TryFrom<u64> for ThreadId { 63 type Error = TryFromIntError; try_from(id: u64) -> Result<Self, Self::Error>64 fn try_from(id: u64) -> Result<Self, Self::Error> { 65 u32::try_from(id).map(Self) 66 } 67 } 68 69 impl From<u32> for ThreadId { from(id: u32) -> Self70 fn from(id: u32) -> Self { 71 Self(id) 72 } 73 } 74 75 impl From<ThreadId> for u64 { from(t: ThreadId) -> Self76 fn from(t: ThreadId) -> Self { 77 t.0.into() 78 } 79 } 80 81 /// The state of a thread. 82 #[derive(Debug, Copy, Clone, PartialEq, Eq)] 83 pub enum ThreadState { 84 /// The thread is enabled and can be executed. 85 Enabled, 86 /// The thread tried to join the specified thread and is blocked until that 87 /// thread terminates. 88 BlockedOnJoin(ThreadId), 89 /// The thread is blocked on some synchronization primitive. It is the 90 /// responsibility of the synchronization primitives to track threads that 91 /// are blocked by them. 92 BlockedOnSync, 93 /// The thread has terminated its execution. We do not delete terminated 94 /// threads (FIXME: why?). 95 Terminated, 96 } 97 98 /// The join status of a thread. 99 #[derive(Debug, Copy, Clone, PartialEq, Eq)] 100 enum ThreadJoinStatus { 101 /// The thread can be joined. 102 Joinable, 103 /// A thread is detached if its join handle was destroyed and no other 104 /// thread can join it. 105 Detached, 106 /// The thread was already joined by some thread and cannot be joined again. 107 Joined, 108 } 109 110 /// A thread. 111 pub struct Thread<'mir, 'tcx> { 112 state: ThreadState, 113 114 /// Name of the thread. 115 thread_name: Option<Vec<u8>>, 116 117 /// The virtual call stack. 118 stack: Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>, 119 120 /// The function to call when the stack ran empty, to figure out what to do next. 121 /// Conceptually, this is the interpreter implementation of the things that happen 'after' the 122 /// Rust language entry point for this thread returns (usually implemented by the C or OS runtime). 123 /// (`None` is an error, it means the callback has not been set up yet or is actively running.) 124 pub(crate) on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>, 125 126 /// The index of the topmost user-relevant frame in `stack`. This field must contain 127 /// the value produced by `get_top_user_relevant_frame`. 128 /// The `None` state here represents 129 /// This field is a cache to reduce how often we call that method. The cache is manually 130 /// maintained inside `MiriMachine::after_stack_push` and `MiriMachine::after_stack_pop`. 131 top_user_relevant_frame: Option<usize>, 132 133 /// The join status. 134 join_status: ThreadJoinStatus, 135 136 /// Stack of active panic payloads for the current thread. Used for storing 137 /// the argument of the call to `miri_start_panic` (the panic payload) when unwinding. 138 /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`. 139 /// 140 /// In real unwinding, the payload gets passed as an argument to the landing pad, 141 /// which then forwards it to 'Resume'. However this argument is implicit in MIR, 142 /// so we have to store it out-of-band. When there are multiple active unwinds, 143 /// the innermost one is always caught first, so we can store them as a stack. 144 pub(crate) panic_payloads: Vec<Scalar<Provenance>>, 145 146 /// Last OS error location in memory. It is a 32-bit integer. 147 pub(crate) last_error: Option<MPlaceTy<'tcx, Provenance>>, 148 } 149 150 pub type StackEmptyCallback<'mir, 'tcx> = 151 Box<dyn FnMut(&mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx, Poll<()>>>; 152 153 impl<'mir, 'tcx> Thread<'mir, 'tcx> { 154 /// Get the name of the current thread, or `<unnamed>` if it was not set. thread_name(&self) -> &[u8]155 fn thread_name(&self) -> &[u8] { 156 if let Some(ref thread_name) = self.thread_name { thread_name } else { b"<unnamed>" } 157 } 158 159 /// Return the top user-relevant frame, if there is one. 160 /// Note that the choice to return `None` here when there is no user-relevant frame is part of 161 /// justifying the optimization that only pushes of user-relevant frames require updating the 162 /// `top_user_relevant_frame` field. compute_top_user_relevant_frame(&self) -> Option<usize>163 fn compute_top_user_relevant_frame(&self) -> Option<usize> { 164 self.stack 165 .iter() 166 .enumerate() 167 .rev() 168 .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None }) 169 } 170 171 /// Re-compute the top user-relevant frame from scratch. recompute_top_user_relevant_frame(&mut self)172 pub fn recompute_top_user_relevant_frame(&mut self) { 173 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(); 174 } 175 176 /// Set the top user-relevant frame to the given value. Must be equal to what 177 /// `get_top_user_relevant_frame` would return! set_top_user_relevant_frame(&mut self, frame_idx: usize)178 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) { 179 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame()); 180 self.top_user_relevant_frame = Some(frame_idx); 181 } 182 183 /// Returns the topmost frame that is considered user-relevant, or the 184 /// top of the stack if there is no such frame, or `None` if the stack is empty. top_user_relevant_frame(&self) -> Option<usize>185 pub fn top_user_relevant_frame(&self) -> Option<usize> { 186 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame()); 187 // This can be called upon creation of an allocation. We create allocations while setting up 188 // parts of the Rust runtime when we do not have any stack frames yet, so we need to handle 189 // empty stacks. 190 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1)) 191 } 192 } 193 194 impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 196 write!( 197 f, 198 "{}({:?}, {:?})", 199 String::from_utf8_lossy(self.thread_name()), 200 self.state, 201 self.join_status 202 ) 203 } 204 } 205 206 impl<'mir, 'tcx> Thread<'mir, 'tcx> { new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>) -> Self207 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>) -> Self { 208 Self { 209 state: ThreadState::Enabled, 210 thread_name: name.map(|name| Vec::from(name.as_bytes())), 211 stack: Vec::new(), 212 top_user_relevant_frame: None, 213 join_status: ThreadJoinStatus::Joinable, 214 panic_payloads: Vec::new(), 215 last_error: None, 216 on_stack_empty, 217 } 218 } 219 } 220 221 impl VisitTags for Thread<'_, '_> { visit_tags(&self, visit: &mut dyn FnMut(BorTag))222 fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) { 223 let Thread { 224 panic_payloads: panic_payload, 225 last_error, 226 stack, 227 top_user_relevant_frame: _, 228 state: _, 229 thread_name: _, 230 join_status: _, 231 on_stack_empty: _, // we assume the closure captures no GC-relevant state 232 } = self; 233 234 for payload in panic_payload { 235 payload.visit_tags(visit); 236 } 237 last_error.visit_tags(visit); 238 for frame in stack { 239 frame.visit_tags(visit) 240 } 241 } 242 } 243 244 impl VisitTags for Frame<'_, '_, Provenance, FrameExtra<'_>> { visit_tags(&self, visit: &mut dyn FnMut(BorTag))245 fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) { 246 let Frame { 247 return_place, 248 locals, 249 extra, 250 body: _, 251 instance: _, 252 return_to_block: _, 253 loc: _, 254 // There are some private fields we cannot access; they contain no tags. 255 .. 256 } = self; 257 258 // Return place. 259 return_place.visit_tags(visit); 260 // Locals. 261 for local in locals.iter() { 262 if let LocalValue::Live(value) = &local.value { 263 value.visit_tags(visit); 264 } 265 } 266 267 extra.visit_tags(visit); 268 } 269 } 270 271 /// A specific moment in time. 272 #[derive(Debug)] 273 pub enum Time { 274 Monotonic(Instant), 275 RealTime(SystemTime), 276 } 277 278 impl Time { 279 /// How long do we have to wait from now until the specified time? get_wait_time(&self, clock: &Clock) -> Duration280 fn get_wait_time(&self, clock: &Clock) -> Duration { 281 match self { 282 Time::Monotonic(instant) => instant.duration_since(clock.now()), 283 Time::RealTime(time) => 284 time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)), 285 } 286 } 287 } 288 289 /// Callbacks are used to implement timeouts. For example, waiting on a 290 /// conditional variable with a timeout creates a callback that is called after 291 /// the specified time and unblocks the thread. If another thread signals on the 292 /// conditional variable, the signal handler deletes the callback. 293 struct TimeoutCallbackInfo<'mir, 'tcx> { 294 /// The callback should be called no earlier than this time. 295 call_time: Time, 296 /// The called function. 297 callback: TimeoutCallback<'mir, 'tcx>, 298 } 299 300 impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 302 write!(f, "TimeoutCallback({:?})", self.call_time) 303 } 304 } 305 306 /// A set of threads. 307 #[derive(Debug)] 308 pub struct ThreadManager<'mir, 'tcx> { 309 /// Identifier of the currently active thread. 310 active_thread: ThreadId, 311 /// Threads used in the program. 312 /// 313 /// Note that this vector also contains terminated threads. 314 threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>, 315 /// This field is pub(crate) because the synchronization primitives 316 /// (`crate::sync`) need a way to access it. 317 pub(crate) sync: SynchronizationState<'mir, 'tcx>, 318 /// A mapping from a thread-local static to an allocation id of a thread 319 /// specific allocation. 320 thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), Pointer<Provenance>>>, 321 /// A flag that indicates that we should change the active thread. 322 yield_active_thread: bool, 323 /// Callbacks that are called once the specified time passes. 324 timeout_callbacks: FxHashMap<ThreadId, TimeoutCallbackInfo<'mir, 'tcx>>, 325 } 326 327 impl VisitTags for ThreadManager<'_, '_> { visit_tags(&self, visit: &mut dyn FnMut(BorTag))328 fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) { 329 let ThreadManager { 330 threads, 331 thread_local_alloc_ids, 332 timeout_callbacks, 333 active_thread: _, 334 yield_active_thread: _, 335 sync, 336 } = self; 337 338 for thread in threads { 339 thread.visit_tags(visit); 340 } 341 for ptr in thread_local_alloc_ids.borrow().values() { 342 ptr.visit_tags(visit); 343 } 344 for callback in timeout_callbacks.values() { 345 callback.callback.visit_tags(visit); 346 } 347 sync.visit_tags(visit); 348 } 349 } 350 351 impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { default() -> Self352 fn default() -> Self { 353 let mut threads = IndexVec::new(); 354 // Create the main thread and add it to the list of threads. 355 threads.push(Thread::new(Some("main"), None)); 356 Self { 357 active_thread: ThreadId::new(0), 358 threads, 359 sync: SynchronizationState::default(), 360 thread_local_alloc_ids: Default::default(), 361 yield_active_thread: false, 362 timeout_callbacks: FxHashMap::default(), 363 } 364 } 365 } 366 367 impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { init( ecx: &mut MiriInterpCx<'mir, 'tcx>, on_main_stack_empty: StackEmptyCallback<'mir, 'tcx>, )368 pub(crate) fn init( 369 ecx: &mut MiriInterpCx<'mir, 'tcx>, 370 on_main_stack_empty: StackEmptyCallback<'mir, 'tcx>, 371 ) { 372 ecx.machine.threads.threads[ThreadId::new(0)].on_stack_empty = Some(on_main_stack_empty); 373 if ecx.tcx.sess.target.os.as_ref() != "windows" { 374 // The main thread can *not* be joined on except on windows. 375 ecx.machine.threads.threads[ThreadId::new(0)].join_status = ThreadJoinStatus::Detached; 376 } 377 } 378 379 /// Check if we have an allocation for the given thread local static for the 380 /// active thread. get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>>381 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>> { 382 self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned() 383 } 384 385 /// Set the pointer for the allocation of the given thread local 386 /// static for the active thread. 387 /// 388 /// Panics if a thread local is initialized twice for the same thread. set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>)389 fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>) { 390 self.thread_local_alloc_ids 391 .borrow_mut() 392 .try_insert((def_id, self.active_thread), ptr) 393 .unwrap(); 394 } 395 396 /// Borrow the stack of the active thread. active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]397 pub fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] { 398 &self.threads[self.active_thread].stack 399 } 400 401 /// Mutably borrow the stack of the active thread. active_thread_stack_mut( &mut self, ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>402 fn active_thread_stack_mut( 403 &mut self, 404 ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> { 405 &mut self.threads[self.active_thread].stack 406 } 407 all_stacks( &self, ) -> impl Iterator<Item = &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]>408 pub fn all_stacks( 409 &self, 410 ) -> impl Iterator<Item = &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]> { 411 self.threads.iter().map(|t| &t.stack[..]) 412 } 413 414 /// Create a new thread and returns its id. create_thread(&mut self, on_stack_empty: StackEmptyCallback<'mir, 'tcx>) -> ThreadId415 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'mir, 'tcx>) -> ThreadId { 416 let new_thread_id = ThreadId::new(self.threads.len()); 417 self.threads.push(Thread::new(None, Some(on_stack_empty))); 418 new_thread_id 419 } 420 421 /// Set an active thread and return the id of the thread that was active before. set_active_thread_id(&mut self, id: ThreadId) -> ThreadId422 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId { 423 let active_thread_id = self.active_thread; 424 self.active_thread = id; 425 assert!(self.active_thread.index() < self.threads.len()); 426 active_thread_id 427 } 428 429 /// Get the id of the currently active thread. get_active_thread_id(&self) -> ThreadId430 pub fn get_active_thread_id(&self) -> ThreadId { 431 self.active_thread 432 } 433 434 /// Get the total number of threads that were ever spawn by this program. get_total_thread_count(&self) -> usize435 pub fn get_total_thread_count(&self) -> usize { 436 self.threads.len() 437 } 438 439 /// Get the total of threads that are currently live, i.e., not yet terminated. 440 /// (They might be blocked.) get_live_thread_count(&self) -> usize441 pub fn get_live_thread_count(&self) -> usize { 442 self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count() 443 } 444 445 /// Has the given thread terminated? has_terminated(&self, thread_id: ThreadId) -> bool446 fn has_terminated(&self, thread_id: ThreadId) -> bool { 447 self.threads[thread_id].state == ThreadState::Terminated 448 } 449 450 /// Have all threads terminated? have_all_terminated(&self) -> bool451 fn have_all_terminated(&self) -> bool { 452 self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) 453 } 454 455 /// Enable the thread for execution. The thread must be terminated. enable_thread(&mut self, thread_id: ThreadId)456 fn enable_thread(&mut self, thread_id: ThreadId) { 457 assert!(self.has_terminated(thread_id)); 458 self.threads[thread_id].state = ThreadState::Enabled; 459 } 460 461 /// Get a mutable borrow of the currently active thread. active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx>462 pub fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { 463 &mut self.threads[self.active_thread] 464 } 465 466 /// Get a shared borrow of the currently active thread. active_thread_ref(&self) -> &Thread<'mir, 'tcx>467 pub fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { 468 &self.threads[self.active_thread] 469 } 470 471 /// Mark the thread as detached, which means that no other thread will try 472 /// to join it and the thread is responsible for cleaning up. 473 /// 474 /// `allow_terminated_joined` allows detaching joined threads that have already terminated. 475 /// This matches Windows's behavior for `CloseHandle`. 476 /// 477 /// See <https://docs.microsoft.com/en-us/windows/win32/procthread/thread-handles-and-identifiers>: 478 /// > The handle is valid until closed, even after the thread it represents has been terminated. detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx>479 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> { 480 trace!("detaching {:?}", id); 481 482 let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated 483 { 484 // "Detached" in particular means "not yet joined". Redundant detaching is still UB. 485 self.threads[id].join_status == ThreadJoinStatus::Detached 486 } else { 487 self.threads[id].join_status != ThreadJoinStatus::Joinable 488 }; 489 if is_ub { 490 throw_ub_format!("trying to detach thread that was already detached or joined"); 491 } 492 493 self.threads[id].join_status = ThreadJoinStatus::Detached; 494 Ok(()) 495 } 496 497 /// Mark that the active thread tries to join the thread with `joined_thread_id`. join_thread( &mut self, joined_thread_id: ThreadId, data_race: Option<&mut data_race::GlobalState>, ) -> InterpResult<'tcx>498 fn join_thread( 499 &mut self, 500 joined_thread_id: ThreadId, 501 data_race: Option<&mut data_race::GlobalState>, 502 ) -> InterpResult<'tcx> { 503 if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached { 504 // On Windows this corresponds to joining on a closed handle. 505 throw_ub_format!("trying to join a detached thread"); 506 } 507 508 // Mark the joined thread as being joined so that we detect if other 509 // threads try to join it. 510 self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined; 511 if self.threads[joined_thread_id].state != ThreadState::Terminated { 512 // The joined thread is still running, we need to wait for it. 513 self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id); 514 trace!( 515 "{:?} blocked on {:?} when trying to join", 516 self.active_thread, 517 joined_thread_id 518 ); 519 } else { 520 // The thread has already terminated - mark join happens-before 521 if let Some(data_race) = data_race { 522 data_race.thread_joined(self, self.active_thread, joined_thread_id); 523 } 524 } 525 Ok(()) 526 } 527 528 /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`. 529 /// If the thread is already joined by another thread, it will throw UB join_thread_exclusive( &mut self, joined_thread_id: ThreadId, data_race: Option<&mut data_race::GlobalState>, ) -> InterpResult<'tcx>530 fn join_thread_exclusive( 531 &mut self, 532 joined_thread_id: ThreadId, 533 data_race: Option<&mut data_race::GlobalState>, 534 ) -> InterpResult<'tcx> { 535 if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined { 536 throw_ub_format!("trying to join an already joined thread"); 537 } 538 539 if joined_thread_id == self.active_thread { 540 throw_ub_format!("trying to join itself"); 541 } 542 543 assert!( 544 self.threads 545 .iter() 546 .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)), 547 "this thread already has threads waiting for its termination" 548 ); 549 550 self.join_thread(joined_thread_id, data_race) 551 } 552 553 /// Set the name of the given thread. set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>)554 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) { 555 self.threads[thread].thread_name = Some(new_thread_name); 556 } 557 558 /// Get the name of the given thread. get_thread_name(&self, thread: ThreadId) -> &[u8]559 pub fn get_thread_name(&self, thread: ThreadId) -> &[u8] { 560 self.threads[thread].thread_name() 561 } 562 563 /// Put the thread into the blocked state. block_thread(&mut self, thread: ThreadId)564 fn block_thread(&mut self, thread: ThreadId) { 565 let state = &mut self.threads[thread].state; 566 assert_eq!(*state, ThreadState::Enabled); 567 *state = ThreadState::BlockedOnSync; 568 } 569 570 /// Put the blocked thread into the enabled state. unblock_thread(&mut self, thread: ThreadId)571 fn unblock_thread(&mut self, thread: ThreadId) { 572 let state = &mut self.threads[thread].state; 573 assert_eq!(*state, ThreadState::BlockedOnSync); 574 *state = ThreadState::Enabled; 575 } 576 577 /// Change the active thread to some enabled thread. yield_active_thread(&mut self)578 fn yield_active_thread(&mut self) { 579 // We do not yield immediately, as swapping out the current stack while executing a MIR statement 580 // could lead to all sorts of confusion. 581 // We should only switch stacks between steps. 582 self.yield_active_thread = true; 583 } 584 585 /// Register the given `callback` to be called once the `call_time` passes. 586 /// 587 /// The callback will be called with `thread` being the active thread, and 588 /// the callback may not change the active thread. register_timeout_callback( &mut self, thread: ThreadId, call_time: Time, callback: TimeoutCallback<'mir, 'tcx>, )589 fn register_timeout_callback( 590 &mut self, 591 thread: ThreadId, 592 call_time: Time, 593 callback: TimeoutCallback<'mir, 'tcx>, 594 ) { 595 self.timeout_callbacks 596 .try_insert(thread, TimeoutCallbackInfo { call_time, callback }) 597 .unwrap(); 598 } 599 600 /// Unregister the callback for the `thread`. unregister_timeout_callback_if_exists(&mut self, thread: ThreadId)601 fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { 602 self.timeout_callbacks.remove(&thread); 603 } 604 605 /// Get a callback that is ready to be called. get_ready_callback( &mut self, clock: &Clock, ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)>606 fn get_ready_callback( 607 &mut self, 608 clock: &Clock, 609 ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> { 610 // We iterate over all threads in the order of their indices because 611 // this allows us to have a deterministic scheduler. 612 for thread in self.threads.indices() { 613 match self.timeout_callbacks.entry(thread) { 614 Entry::Occupied(entry) => { 615 if entry.get().call_time.get_wait_time(clock) == Duration::new(0, 0) { 616 return Some((thread, entry.remove().callback)); 617 } 618 } 619 Entry::Vacant(_) => {} 620 } 621 } 622 None 623 } 624 625 /// Wakes up threads joining on the active one and deallocates thread-local statics. 626 /// The `AllocId` that can now be freed are returned. thread_terminated( &mut self, mut data_race: Option<&mut data_race::GlobalState>, current_span: Span, ) -> Vec<Pointer<Provenance>>627 fn thread_terminated( 628 &mut self, 629 mut data_race: Option<&mut data_race::GlobalState>, 630 current_span: Span, 631 ) -> Vec<Pointer<Provenance>> { 632 let mut free_tls_statics = Vec::new(); 633 { 634 let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut(); 635 thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| { 636 if thread != self.active_thread { 637 // Keep this static around. 638 return true; 639 } 640 // Delete this static from the map and from memory. 641 // We cannot free directly here as we cannot use `?` in this context. 642 free_tls_statics.push(alloc_id); 643 false 644 }); 645 } 646 // Set the thread into a terminated state in the data-race detector. 647 if let Some(ref mut data_race) = data_race { 648 data_race.thread_terminated(self, current_span); 649 } 650 // Check if we need to unblock any threads. 651 let mut joined_threads = vec![]; // store which threads joined, we'll need it 652 for (i, thread) in self.threads.iter_enumerated_mut() { 653 if thread.state == ThreadState::BlockedOnJoin(self.active_thread) { 654 // The thread has terminated, mark happens-before edge to joining thread 655 if data_race.is_some() { 656 joined_threads.push(i); 657 } 658 trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); 659 thread.state = ThreadState::Enabled; 660 } 661 } 662 for &i in &joined_threads { 663 data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread); 664 } 665 free_tls_statics 666 } 667 668 /// Decide which action to take next and on which thread. 669 /// 670 /// The currently implemented scheduling policy is the one that is commonly 671 /// used in stateless model checkers such as Loom: run the active thread as 672 /// long as we can and switch only when we have to (the active thread was 673 /// blocked, terminated, or has explicitly asked to be preempted). schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction>674 fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> { 675 // This thread and the program can keep going. 676 if self.threads[self.active_thread].state == ThreadState::Enabled 677 && !self.yield_active_thread 678 { 679 // The currently active thread is still enabled, just continue with it. 680 return Ok(SchedulingAction::ExecuteStep); 681 } 682 // The active thread yielded or got terminated. Let's see if there are any timeouts to take 683 // care of. We do this *before* running any other thread, to ensure that timeouts "in the 684 // past" fire before any other thread can take an action. This ensures that for 685 // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by 686 // abstime has already been passed at the time of the call". 687 // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html> 688 let potential_sleep_time = 689 self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time(clock)).min(); 690 if potential_sleep_time == Some(Duration::new(0, 0)) { 691 return Ok(SchedulingAction::ExecuteTimeoutCallback); 692 } 693 // No callbacks immediately scheduled, pick a regular thread to execute. 694 // The active thread blocked or yielded. So we go search for another enabled thread. 695 // Crucially, we start searching at the current active thread ID, rather than at 0, since we 696 // want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2. 697 // 698 // `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after* 699 // the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the 700 // active thread. 701 let threads = self 702 .threads 703 .iter_enumerated() 704 .skip(self.active_thread.index() + 1) 705 .chain(self.threads.iter_enumerated().take(self.active_thread.index())); 706 for (id, thread) in threads { 707 debug_assert_ne!(self.active_thread, id); 708 if thread.state == ThreadState::Enabled { 709 self.active_thread = id; 710 break; 711 } 712 } 713 self.yield_active_thread = false; 714 if self.threads[self.active_thread].state == ThreadState::Enabled { 715 return Ok(SchedulingAction::ExecuteStep); 716 } 717 // We have not found a thread to execute. 718 if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { 719 unreachable!("all threads terminated without the main thread terminating?!"); 720 } else if let Some(sleep_time) = potential_sleep_time { 721 // All threads are currently blocked, but we have unexecuted 722 // timeout_callbacks, which may unblock some of the threads. Hence, 723 // sleep until the first callback. 724 Ok(SchedulingAction::Sleep(sleep_time)) 725 } else { 726 throw_machine_stop!(TerminationInfo::Deadlock); 727 } 728 } 729 } 730 731 impl<'mir, 'tcx: 'mir> EvalContextPrivExt<'mir, 'tcx> for MiriInterpCx<'mir, 'tcx> {} 732 trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> { 733 /// Execute a timeout callback on the callback's thread. 734 #[inline] run_timeout_callback(&mut self) -> InterpResult<'tcx>735 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { 736 let this = self.eval_context_mut(); 737 let (thread, callback) = if let Some((thread, callback)) = 738 this.machine.threads.get_ready_callback(&this.machine.clock) 739 { 740 (thread, callback) 741 } else { 742 // get_ready_callback can return None if the computer's clock 743 // was shifted after calling the scheduler and before the call 744 // to get_ready_callback (see issue 745 // https://github.com/rust-lang/miri/issues/1763). In this case, 746 // just do nothing, which effectively just returns to the 747 // scheduler. 748 return Ok(()); 749 }; 750 // This back-and-forth with `set_active_thread` is here because of two 751 // design decisions: 752 // 1. Make the caller and not the callback responsible for changing 753 // thread. 754 // 2. Make the scheduler the only place that can change the active 755 // thread. 756 let old_thread = this.set_active_thread(thread); 757 callback.call(this)?; 758 this.set_active_thread(old_thread); 759 Ok(()) 760 } 761 762 #[inline] run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>>763 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> { 764 let this = self.eval_context_mut(); 765 let mut callback = this 766 .active_thread_mut() 767 .on_stack_empty 768 .take() 769 .expect("`on_stack_empty` not set up, or already running"); 770 let res = callback(this)?; 771 this.active_thread_mut().on_stack_empty = Some(callback); 772 Ok(res) 773 } 774 } 775 776 // Public interface to thread management. 777 impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {} 778 pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { 779 /// Get a thread-specific allocation id for the given thread-local static. 780 /// If needed, allocate a new one. get_or_create_thread_local_alloc( &mut self, def_id: DefId, ) -> InterpResult<'tcx, Pointer<Provenance>>781 fn get_or_create_thread_local_alloc( 782 &mut self, 783 def_id: DefId, 784 ) -> InterpResult<'tcx, Pointer<Provenance>> { 785 let this = self.eval_context_mut(); 786 let tcx = this.tcx; 787 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) { 788 // We already have a thread-specific allocation id for this 789 // thread-local static. 790 Ok(old_alloc) 791 } else { 792 // We need to allocate a thread-specific allocation id for this 793 // thread-local static. 794 // First, we compute the initial value for this static. 795 if tcx.is_foreign_item(def_id) { 796 throw_unsup_format!("foreign thread-local statics are not supported"); 797 } 798 // We don't give a span -- statics don't need that, they cannot be generic or associated. 799 let allocation = this.ctfe_query(None, |tcx| tcx.eval_static_initializer(def_id))?; 800 let mut allocation = allocation.inner().clone(); 801 // This allocation will be deallocated when the thread dies, so it is not in read-only memory. 802 allocation.mutability = Mutability::Mut; 803 // Create a fresh allocation with this content. 804 let new_alloc = this.allocate_raw_ptr(allocation, MiriMemoryKind::Tls.into())?; 805 this.machine.threads.set_thread_local_alloc(def_id, new_alloc); 806 Ok(new_alloc) 807 } 808 } 809 810 /// Start a regular (non-main) thread. 811 #[inline] start_regular_thread( &mut self, thread: Option<MPlaceTy<'tcx, Provenance>>, start_routine: Pointer<Option<Provenance>>, start_abi: Abi, func_arg: ImmTy<'tcx, Provenance>, ret_layout: TyAndLayout<'tcx>, ) -> InterpResult<'tcx, ThreadId>812 fn start_regular_thread( 813 &mut self, 814 thread: Option<MPlaceTy<'tcx, Provenance>>, 815 start_routine: Pointer<Option<Provenance>>, 816 start_abi: Abi, 817 func_arg: ImmTy<'tcx, Provenance>, 818 ret_layout: TyAndLayout<'tcx>, 819 ) -> InterpResult<'tcx, ThreadId> { 820 let this = self.eval_context_mut(); 821 822 // Create the new thread 823 let new_thread_id = this.machine.threads.create_thread({ 824 let mut state = tls::TlsDtorsState::default(); 825 Box::new(move |m| state.on_stack_empty(m)) 826 }); 827 let current_span = this.machine.current_span(); 828 if let Some(data_race) = &mut this.machine.data_race { 829 data_race.thread_created(&this.machine.threads, new_thread_id, current_span); 830 } 831 832 // Write the current thread-id, switch to the next thread later 833 // to treat this write operation as occurring on the current thread. 834 if let Some(thread_info_place) = thread { 835 this.write_scalar( 836 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size), 837 &thread_info_place.into(), 838 )?; 839 } 840 841 // Finally switch to new thread so that we can push the first stackframe. 842 // After this all accesses will be treated as occurring in the new thread. 843 let old_thread_id = this.set_active_thread(new_thread_id); 844 845 // Perform the function pointer load in the new thread frame. 846 let instance = this.get_ptr_fn(start_routine)?.as_instance()?; 847 848 // Note: the returned value is currently ignored (see the FIXME in 849 // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use 850 // it. 851 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?; 852 853 this.call_function( 854 instance, 855 start_abi, 856 &[*func_arg], 857 Some(&ret_place.into()), 858 StackPopCleanup::Root { cleanup: true }, 859 )?; 860 861 // Restore the old active thread frame. 862 this.set_active_thread(old_thread_id); 863 864 Ok(new_thread_id) 865 } 866 867 #[inline] detach_thread( &mut self, thread_id: ThreadId, allow_terminated_joined: bool, ) -> InterpResult<'tcx>868 fn detach_thread( 869 &mut self, 870 thread_id: ThreadId, 871 allow_terminated_joined: bool, 872 ) -> InterpResult<'tcx> { 873 let this = self.eval_context_mut(); 874 this.machine.threads.detach_thread(thread_id, allow_terminated_joined) 875 } 876 877 #[inline] join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx>878 fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { 879 let this = self.eval_context_mut(); 880 this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?; 881 Ok(()) 882 } 883 884 #[inline] join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx>885 fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { 886 let this = self.eval_context_mut(); 887 this.machine 888 .threads 889 .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?; 890 Ok(()) 891 } 892 893 #[inline] set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId894 fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId { 895 let this = self.eval_context_mut(); 896 this.machine.threads.set_active_thread_id(thread_id) 897 } 898 899 #[inline] get_active_thread(&self) -> ThreadId900 fn get_active_thread(&self) -> ThreadId { 901 let this = self.eval_context_ref(); 902 this.machine.threads.get_active_thread_id() 903 } 904 905 #[inline] active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx>906 fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { 907 let this = self.eval_context_mut(); 908 this.machine.threads.active_thread_mut() 909 } 910 911 #[inline] active_thread_ref(&self) -> &Thread<'mir, 'tcx>912 fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { 913 let this = self.eval_context_ref(); 914 this.machine.threads.active_thread_ref() 915 } 916 917 #[inline] get_total_thread_count(&self) -> usize918 fn get_total_thread_count(&self) -> usize { 919 let this = self.eval_context_ref(); 920 this.machine.threads.get_total_thread_count() 921 } 922 923 #[inline] have_all_terminated(&self) -> bool924 fn have_all_terminated(&self) -> bool { 925 let this = self.eval_context_ref(); 926 this.machine.threads.have_all_terminated() 927 } 928 929 #[inline] enable_thread(&mut self, thread_id: ThreadId)930 fn enable_thread(&mut self, thread_id: ThreadId) { 931 let this = self.eval_context_mut(); 932 this.machine.threads.enable_thread(thread_id); 933 } 934 935 #[inline] active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]936 fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] { 937 let this = self.eval_context_ref(); 938 this.machine.threads.active_thread_stack() 939 } 940 941 #[inline] active_thread_stack_mut( &mut self, ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>942 fn active_thread_stack_mut( 943 &mut self, 944 ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> { 945 let this = self.eval_context_mut(); 946 this.machine.threads.active_thread_stack_mut() 947 } 948 949 /// Set the name of the current thread. The buffer must not include the null terminator. 950 #[inline] set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>)951 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) { 952 let this = self.eval_context_mut(); 953 this.machine.threads.set_thread_name(thread, new_thread_name); 954 } 955 956 #[inline] set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16])957 fn set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16]) { 958 let this = self.eval_context_mut(); 959 960 // The Windows `GetThreadDescription` shim to get the thread name isn't implemented, so being lossy is okay. 961 // This is only read by diagnostics, which already use `from_utf8_lossy`. 962 this.machine 963 .threads 964 .set_thread_name(thread, String::from_utf16_lossy(new_thread_name).into_bytes()); 965 } 966 967 #[inline] get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8] where 'mir: 'c,968 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8] 969 where 970 'mir: 'c, 971 { 972 self.eval_context_ref().machine.threads.get_thread_name(thread) 973 } 974 975 #[inline] block_thread(&mut self, thread: ThreadId)976 fn block_thread(&mut self, thread: ThreadId) { 977 self.eval_context_mut().machine.threads.block_thread(thread); 978 } 979 980 #[inline] unblock_thread(&mut self, thread: ThreadId)981 fn unblock_thread(&mut self, thread: ThreadId) { 982 self.eval_context_mut().machine.threads.unblock_thread(thread); 983 } 984 985 #[inline] yield_active_thread(&mut self)986 fn yield_active_thread(&mut self) { 987 self.eval_context_mut().machine.threads.yield_active_thread(); 988 } 989 990 #[inline] maybe_preempt_active_thread(&mut self)991 fn maybe_preempt_active_thread(&mut self) { 992 use rand::Rng as _; 993 994 let this = self.eval_context_mut(); 995 if this.machine.rng.get_mut().gen_bool(this.machine.preemption_rate) { 996 this.yield_active_thread(); 997 } 998 } 999 1000 #[inline] register_timeout_callback( &mut self, thread: ThreadId, call_time: Time, callback: TimeoutCallback<'mir, 'tcx>, )1001 fn register_timeout_callback( 1002 &mut self, 1003 thread: ThreadId, 1004 call_time: Time, 1005 callback: TimeoutCallback<'mir, 'tcx>, 1006 ) { 1007 let this = self.eval_context_mut(); 1008 if !this.machine.communicate() && matches!(call_time, Time::RealTime(..)) { 1009 panic!("cannot have `RealTime` callback with isolation enabled!") 1010 } 1011 this.machine.threads.register_timeout_callback(thread, call_time, callback); 1012 } 1013 1014 #[inline] unregister_timeout_callback_if_exists(&mut self, thread: ThreadId)1015 fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { 1016 let this = self.eval_context_mut(); 1017 this.machine.threads.unregister_timeout_callback_if_exists(thread); 1018 } 1019 1020 /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program 1021 /// termination). run_threads(&mut self) -> InterpResult<'tcx, !>1022 fn run_threads(&mut self) -> InterpResult<'tcx, !> { 1023 static SIGNALED: AtomicBool = AtomicBool::new(false); 1024 ctrlc::set_handler(move || { 1025 // Indicate that we have ben signaled to stop. If we were already signaled, exit 1026 // immediately. In our interpreter loop we try to consult this value often, but if for 1027 // whatever reason we don't get to that check or the cleanup we do upon finding that 1028 // this bool has become true takes a long time, the exit here will promptly exit the 1029 // process on the second Ctrl-C. 1030 if SIGNALED.swap(true, Relaxed) { 1031 std::process::exit(1); 1032 } 1033 }) 1034 .unwrap(); 1035 let this = self.eval_context_mut(); 1036 loop { 1037 if SIGNALED.load(Relaxed) { 1038 this.machine.handle_abnormal_termination(); 1039 std::process::exit(1); 1040 } 1041 match this.machine.threads.schedule(&this.machine.clock)? { 1042 SchedulingAction::ExecuteStep => { 1043 if !this.step()? { 1044 // See if this thread can do something else. 1045 match this.run_on_stack_empty()? { 1046 Poll::Pending => {} // keep going 1047 Poll::Ready(()) => this.terminate_active_thread()?, 1048 } 1049 } 1050 } 1051 SchedulingAction::ExecuteTimeoutCallback => { 1052 this.run_timeout_callback()?; 1053 } 1054 SchedulingAction::Sleep(duration) => { 1055 this.machine.clock.sleep(duration); 1056 } 1057 } 1058 } 1059 } 1060 1061 /// Handles thread termination of the active thread: wakes up threads joining on this one, 1062 /// and deallocated thread-local statics. 1063 /// 1064 /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`. 1065 #[inline] terminate_active_thread(&mut self) -> InterpResult<'tcx>1066 fn terminate_active_thread(&mut self) -> InterpResult<'tcx> { 1067 let this = self.eval_context_mut(); 1068 let thread = this.active_thread_mut(); 1069 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated"); 1070 thread.state = ThreadState::Terminated; 1071 1072 let current_span = this.machine.current_span(); 1073 for ptr in 1074 this.machine.threads.thread_terminated(this.machine.data_race.as_mut(), current_span) 1075 { 1076 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?; 1077 } 1078 Ok(()) 1079 } 1080 } 1081