• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Implements threads.
2 
3 use std::cell::RefCell;
4 use std::collections::hash_map::Entry;
5 use std::num::TryFromIntError;
6 use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
7 use std::task::Poll;
8 use std::time::{Duration, SystemTime};
9 
10 use log::trace;
11 
12 use rustc_data_structures::fx::FxHashMap;
13 use rustc_hir::def_id::DefId;
14 use rustc_index::{Idx, IndexVec};
15 use rustc_middle::mir::Mutability;
16 use rustc_middle::ty::layout::TyAndLayout;
17 use rustc_span::Span;
18 use rustc_target::spec::abi::Abi;
19 
20 use crate::concurrency::data_race;
21 use crate::concurrency::sync::SynchronizationState;
22 use crate::shims::tls;
23 use crate::*;
24 
25 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
26 enum SchedulingAction {
27     /// Execute step on the active thread.
28     ExecuteStep,
29     /// Execute a timeout callback.
30     ExecuteTimeoutCallback,
31     /// Wait for a bit, until there is a timeout to be called.
32     Sleep(Duration),
33 }
34 
35 /// Trait for callbacks that can be executed when some event happens, such as after a timeout.
36 pub trait MachineCallback<'mir, 'tcx>: VisitTags {
call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>37     fn call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>;
38 }
39 
40 type TimeoutCallback<'mir, 'tcx> = Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>;
41 
42 /// A thread identifier.
43 #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
44 pub struct ThreadId(u32);
45 
46 impl ThreadId {
to_u32(self) -> u3247     pub fn to_u32(self) -> u32 {
48         self.0
49     }
50 }
51 
52 impl Idx for ThreadId {
new(idx: usize) -> Self53     fn new(idx: usize) -> Self {
54         ThreadId(u32::try_from(idx).unwrap())
55     }
56 
index(self) -> usize57     fn index(self) -> usize {
58         usize::try_from(self.0).unwrap()
59     }
60 }
61 
62 impl TryFrom<u64> for ThreadId {
63     type Error = TryFromIntError;
try_from(id: u64) -> Result<Self, Self::Error>64     fn try_from(id: u64) -> Result<Self, Self::Error> {
65         u32::try_from(id).map(Self)
66     }
67 }
68 
69 impl From<u32> for ThreadId {
from(id: u32) -> Self70     fn from(id: u32) -> Self {
71         Self(id)
72     }
73 }
74 
75 impl From<ThreadId> for u64 {
from(t: ThreadId) -> Self76     fn from(t: ThreadId) -> Self {
77         t.0.into()
78     }
79 }
80 
81 /// The state of a thread.
82 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
83 pub enum ThreadState {
84     /// The thread is enabled and can be executed.
85     Enabled,
86     /// The thread tried to join the specified thread and is blocked until that
87     /// thread terminates.
88     BlockedOnJoin(ThreadId),
89     /// The thread is blocked on some synchronization primitive. It is the
90     /// responsibility of the synchronization primitives to track threads that
91     /// are blocked by them.
92     BlockedOnSync,
93     /// The thread has terminated its execution. We do not delete terminated
94     /// threads (FIXME: why?).
95     Terminated,
96 }
97 
98 /// The join status of a thread.
99 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
100 enum ThreadJoinStatus {
101     /// The thread can be joined.
102     Joinable,
103     /// A thread is detached if its join handle was destroyed and no other
104     /// thread can join it.
105     Detached,
106     /// The thread was already joined by some thread and cannot be joined again.
107     Joined,
108 }
109 
110 /// A thread.
111 pub struct Thread<'mir, 'tcx> {
112     state: ThreadState,
113 
114     /// Name of the thread.
115     thread_name: Option<Vec<u8>>,
116 
117     /// The virtual call stack.
118     stack: Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>,
119 
120     /// The function to call when the stack ran empty, to figure out what to do next.
121     /// Conceptually, this is the interpreter implementation of the things that happen 'after' the
122     /// Rust language entry point for this thread returns (usually implemented by the C or OS runtime).
123     /// (`None` is an error, it means the callback has not been set up yet or is actively running.)
124     pub(crate) on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>,
125 
126     /// The index of the topmost user-relevant frame in `stack`. This field must contain
127     /// the value produced by `get_top_user_relevant_frame`.
128     /// The `None` state here represents
129     /// This field is a cache to reduce how often we call that method. The cache is manually
130     /// maintained inside `MiriMachine::after_stack_push` and `MiriMachine::after_stack_pop`.
131     top_user_relevant_frame: Option<usize>,
132 
133     /// The join status.
134     join_status: ThreadJoinStatus,
135 
136     /// Stack of active panic payloads for the current thread. Used for storing
137     /// the argument of the call to `miri_start_panic` (the panic payload) when unwinding.
138     /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`.
139     ///
140     /// In real unwinding, the payload gets passed as an argument to the landing pad,
141     /// which then forwards it to 'Resume'. However this argument is implicit in MIR,
142     /// so we have to store it out-of-band. When there are multiple active unwinds,
143     /// the innermost one is always caught first, so we can store them as a stack.
144     pub(crate) panic_payloads: Vec<Scalar<Provenance>>,
145 
146     /// Last OS error location in memory. It is a 32-bit integer.
147     pub(crate) last_error: Option<MPlaceTy<'tcx, Provenance>>,
148 }
149 
150 pub type StackEmptyCallback<'mir, 'tcx> =
151     Box<dyn FnMut(&mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx, Poll<()>>>;
152 
153 impl<'mir, 'tcx> Thread<'mir, 'tcx> {
154     /// Get the name of the current thread, or `<unnamed>` if it was not set.
thread_name(&self) -> &[u8]155     fn thread_name(&self) -> &[u8] {
156         if let Some(ref thread_name) = self.thread_name { thread_name } else { b"<unnamed>" }
157     }
158 
159     /// Return the top user-relevant frame, if there is one.
160     /// Note that the choice to return `None` here when there is no user-relevant frame is part of
161     /// justifying the optimization that only pushes of user-relevant frames require updating the
162     /// `top_user_relevant_frame` field.
compute_top_user_relevant_frame(&self) -> Option<usize>163     fn compute_top_user_relevant_frame(&self) -> Option<usize> {
164         self.stack
165             .iter()
166             .enumerate()
167             .rev()
168             .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
169     }
170 
171     /// Re-compute the top user-relevant frame from scratch.
recompute_top_user_relevant_frame(&mut self)172     pub fn recompute_top_user_relevant_frame(&mut self) {
173         self.top_user_relevant_frame = self.compute_top_user_relevant_frame();
174     }
175 
176     /// Set the top user-relevant frame to the given value. Must be equal to what
177     /// `get_top_user_relevant_frame` would return!
set_top_user_relevant_frame(&mut self, frame_idx: usize)178     pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
179         debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame());
180         self.top_user_relevant_frame = Some(frame_idx);
181     }
182 
183     /// Returns the topmost frame that is considered user-relevant, or the
184     /// top of the stack if there is no such frame, or `None` if the stack is empty.
top_user_relevant_frame(&self) -> Option<usize>185     pub fn top_user_relevant_frame(&self) -> Option<usize> {
186         debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame());
187         // This can be called upon creation of an allocation. We create allocations while setting up
188         // parts of the Rust runtime when we do not have any stack frames yet, so we need to handle
189         // empty stacks.
190         self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
191     }
192 }
193 
194 impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result195     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196         write!(
197             f,
198             "{}({:?}, {:?})",
199             String::from_utf8_lossy(self.thread_name()),
200             self.state,
201             self.join_status
202         )
203     }
204 }
205 
206 impl<'mir, 'tcx> Thread<'mir, 'tcx> {
new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>) -> Self207     fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>) -> Self {
208         Self {
209             state: ThreadState::Enabled,
210             thread_name: name.map(|name| Vec::from(name.as_bytes())),
211             stack: Vec::new(),
212             top_user_relevant_frame: None,
213             join_status: ThreadJoinStatus::Joinable,
214             panic_payloads: Vec::new(),
215             last_error: None,
216             on_stack_empty,
217         }
218     }
219 }
220 
221 impl VisitTags for Thread<'_, '_> {
visit_tags(&self, visit: &mut dyn FnMut(BorTag))222     fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
223         let Thread {
224             panic_payloads: panic_payload,
225             last_error,
226             stack,
227             top_user_relevant_frame: _,
228             state: _,
229             thread_name: _,
230             join_status: _,
231             on_stack_empty: _, // we assume the closure captures no GC-relevant state
232         } = self;
233 
234         for payload in panic_payload {
235             payload.visit_tags(visit);
236         }
237         last_error.visit_tags(visit);
238         for frame in stack {
239             frame.visit_tags(visit)
240         }
241     }
242 }
243 
244 impl VisitTags for Frame<'_, '_, Provenance, FrameExtra<'_>> {
visit_tags(&self, visit: &mut dyn FnMut(BorTag))245     fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
246         let Frame {
247             return_place,
248             locals,
249             extra,
250             body: _,
251             instance: _,
252             return_to_block: _,
253             loc: _,
254             // There are some private fields we cannot access; they contain no tags.
255             ..
256         } = self;
257 
258         // Return place.
259         return_place.visit_tags(visit);
260         // Locals.
261         for local in locals.iter() {
262             if let LocalValue::Live(value) = &local.value {
263                 value.visit_tags(visit);
264             }
265         }
266 
267         extra.visit_tags(visit);
268     }
269 }
270 
271 /// A specific moment in time.
272 #[derive(Debug)]
273 pub enum Time {
274     Monotonic(Instant),
275     RealTime(SystemTime),
276 }
277 
278 impl Time {
279     /// How long do we have to wait from now until the specified time?
get_wait_time(&self, clock: &Clock) -> Duration280     fn get_wait_time(&self, clock: &Clock) -> Duration {
281         match self {
282             Time::Monotonic(instant) => instant.duration_since(clock.now()),
283             Time::RealTime(time) =>
284                 time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
285         }
286     }
287 }
288 
289 /// Callbacks are used to implement timeouts. For example, waiting on a
290 /// conditional variable with a timeout creates a callback that is called after
291 /// the specified time and unblocks the thread. If another thread signals on the
292 /// conditional variable, the signal handler deletes the callback.
293 struct TimeoutCallbackInfo<'mir, 'tcx> {
294     /// The callback should be called no earlier than this time.
295     call_time: Time,
296     /// The called function.
297     callback: TimeoutCallback<'mir, 'tcx>,
298 }
299 
300 impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result301     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302         write!(f, "TimeoutCallback({:?})", self.call_time)
303     }
304 }
305 
306 /// A set of threads.
307 #[derive(Debug)]
308 pub struct ThreadManager<'mir, 'tcx> {
309     /// Identifier of the currently active thread.
310     active_thread: ThreadId,
311     /// Threads used in the program.
312     ///
313     /// Note that this vector also contains terminated threads.
314     threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
315     /// This field is pub(crate) because the synchronization primitives
316     /// (`crate::sync`) need a way to access it.
317     pub(crate) sync: SynchronizationState<'mir, 'tcx>,
318     /// A mapping from a thread-local static to an allocation id of a thread
319     /// specific allocation.
320     thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), Pointer<Provenance>>>,
321     /// A flag that indicates that we should change the active thread.
322     yield_active_thread: bool,
323     /// Callbacks that are called once the specified time passes.
324     timeout_callbacks: FxHashMap<ThreadId, TimeoutCallbackInfo<'mir, 'tcx>>,
325 }
326 
327 impl VisitTags for ThreadManager<'_, '_> {
visit_tags(&self, visit: &mut dyn FnMut(BorTag))328     fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
329         let ThreadManager {
330             threads,
331             thread_local_alloc_ids,
332             timeout_callbacks,
333             active_thread: _,
334             yield_active_thread: _,
335             sync,
336         } = self;
337 
338         for thread in threads {
339             thread.visit_tags(visit);
340         }
341         for ptr in thread_local_alloc_ids.borrow().values() {
342             ptr.visit_tags(visit);
343         }
344         for callback in timeout_callbacks.values() {
345             callback.callback.visit_tags(visit);
346         }
347         sync.visit_tags(visit);
348     }
349 }
350 
351 impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
default() -> Self352     fn default() -> Self {
353         let mut threads = IndexVec::new();
354         // Create the main thread and add it to the list of threads.
355         threads.push(Thread::new(Some("main"), None));
356         Self {
357             active_thread: ThreadId::new(0),
358             threads,
359             sync: SynchronizationState::default(),
360             thread_local_alloc_ids: Default::default(),
361             yield_active_thread: false,
362             timeout_callbacks: FxHashMap::default(),
363         }
364     }
365 }
366 
367 impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
init( ecx: &mut MiriInterpCx<'mir, 'tcx>, on_main_stack_empty: StackEmptyCallback<'mir, 'tcx>, )368     pub(crate) fn init(
369         ecx: &mut MiriInterpCx<'mir, 'tcx>,
370         on_main_stack_empty: StackEmptyCallback<'mir, 'tcx>,
371     ) {
372         ecx.machine.threads.threads[ThreadId::new(0)].on_stack_empty = Some(on_main_stack_empty);
373         if ecx.tcx.sess.target.os.as_ref() != "windows" {
374             // The main thread can *not* be joined on except on windows.
375             ecx.machine.threads.threads[ThreadId::new(0)].join_status = ThreadJoinStatus::Detached;
376         }
377     }
378 
379     /// Check if we have an allocation for the given thread local static for the
380     /// active thread.
get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>>381     fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>> {
382         self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned()
383     }
384 
385     /// Set the pointer for the allocation of the given thread local
386     /// static for the active thread.
387     ///
388     /// Panics if a thread local is initialized twice for the same thread.
set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>)389     fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>) {
390         self.thread_local_alloc_ids
391             .borrow_mut()
392             .try_insert((def_id, self.active_thread), ptr)
393             .unwrap();
394     }
395 
396     /// Borrow the stack of the active thread.
active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]397     pub fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] {
398         &self.threads[self.active_thread].stack
399     }
400 
401     /// Mutably borrow the stack of the active thread.
active_thread_stack_mut( &mut self, ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>402     fn active_thread_stack_mut(
403         &mut self,
404     ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> {
405         &mut self.threads[self.active_thread].stack
406     }
407 
all_stacks( &self, ) -> impl Iterator<Item = &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]>408     pub fn all_stacks(
409         &self,
410     ) -> impl Iterator<Item = &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]> {
411         self.threads.iter().map(|t| &t.stack[..])
412     }
413 
414     /// Create a new thread and returns its id.
create_thread(&mut self, on_stack_empty: StackEmptyCallback<'mir, 'tcx>) -> ThreadId415     fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'mir, 'tcx>) -> ThreadId {
416         let new_thread_id = ThreadId::new(self.threads.len());
417         self.threads.push(Thread::new(None, Some(on_stack_empty)));
418         new_thread_id
419     }
420 
421     /// Set an active thread and return the id of the thread that was active before.
set_active_thread_id(&mut self, id: ThreadId) -> ThreadId422     fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
423         let active_thread_id = self.active_thread;
424         self.active_thread = id;
425         assert!(self.active_thread.index() < self.threads.len());
426         active_thread_id
427     }
428 
429     /// Get the id of the currently active thread.
get_active_thread_id(&self) -> ThreadId430     pub fn get_active_thread_id(&self) -> ThreadId {
431         self.active_thread
432     }
433 
434     /// Get the total number of threads that were ever spawn by this program.
get_total_thread_count(&self) -> usize435     pub fn get_total_thread_count(&self) -> usize {
436         self.threads.len()
437     }
438 
439     /// Get the total of threads that are currently live, i.e., not yet terminated.
440     /// (They might be blocked.)
get_live_thread_count(&self) -> usize441     pub fn get_live_thread_count(&self) -> usize {
442         self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count()
443     }
444 
445     /// Has the given thread terminated?
has_terminated(&self, thread_id: ThreadId) -> bool446     fn has_terminated(&self, thread_id: ThreadId) -> bool {
447         self.threads[thread_id].state == ThreadState::Terminated
448     }
449 
450     /// Have all threads terminated?
have_all_terminated(&self) -> bool451     fn have_all_terminated(&self) -> bool {
452         self.threads.iter().all(|thread| thread.state == ThreadState::Terminated)
453     }
454 
455     /// Enable the thread for execution. The thread must be terminated.
enable_thread(&mut self, thread_id: ThreadId)456     fn enable_thread(&mut self, thread_id: ThreadId) {
457         assert!(self.has_terminated(thread_id));
458         self.threads[thread_id].state = ThreadState::Enabled;
459     }
460 
461     /// Get a mutable borrow of the currently active thread.
active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx>462     pub fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> {
463         &mut self.threads[self.active_thread]
464     }
465 
466     /// Get a shared borrow of the currently active thread.
active_thread_ref(&self) -> &Thread<'mir, 'tcx>467     pub fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> {
468         &self.threads[self.active_thread]
469     }
470 
471     /// Mark the thread as detached, which means that no other thread will try
472     /// to join it and the thread is responsible for cleaning up.
473     ///
474     /// `allow_terminated_joined` allows detaching joined threads that have already terminated.
475     /// This matches Windows's behavior for `CloseHandle`.
476     ///
477     /// See <https://docs.microsoft.com/en-us/windows/win32/procthread/thread-handles-and-identifiers>:
478     /// > The handle is valid until closed, even after the thread it represents has been terminated.
detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx>479     fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
480         trace!("detaching {:?}", id);
481 
482         let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated
483         {
484             // "Detached" in particular means "not yet joined". Redundant detaching is still UB.
485             self.threads[id].join_status == ThreadJoinStatus::Detached
486         } else {
487             self.threads[id].join_status != ThreadJoinStatus::Joinable
488         };
489         if is_ub {
490             throw_ub_format!("trying to detach thread that was already detached or joined");
491         }
492 
493         self.threads[id].join_status = ThreadJoinStatus::Detached;
494         Ok(())
495     }
496 
497     /// Mark that the active thread tries to join the thread with `joined_thread_id`.
join_thread( &mut self, joined_thread_id: ThreadId, data_race: Option<&mut data_race::GlobalState>, ) -> InterpResult<'tcx>498     fn join_thread(
499         &mut self,
500         joined_thread_id: ThreadId,
501         data_race: Option<&mut data_race::GlobalState>,
502     ) -> InterpResult<'tcx> {
503         if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
504             // On Windows this corresponds to joining on a closed handle.
505             throw_ub_format!("trying to join a detached thread");
506         }
507 
508         // Mark the joined thread as being joined so that we detect if other
509         // threads try to join it.
510         self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
511         if self.threads[joined_thread_id].state != ThreadState::Terminated {
512             // The joined thread is still running, we need to wait for it.
513             self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
514             trace!(
515                 "{:?} blocked on {:?} when trying to join",
516                 self.active_thread,
517                 joined_thread_id
518             );
519         } else {
520             // The thread has already terminated - mark join happens-before
521             if let Some(data_race) = data_race {
522                 data_race.thread_joined(self, self.active_thread, joined_thread_id);
523             }
524         }
525         Ok(())
526     }
527 
528     /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
529     /// If the thread is already joined by another thread, it will throw UB
join_thread_exclusive( &mut self, joined_thread_id: ThreadId, data_race: Option<&mut data_race::GlobalState>, ) -> InterpResult<'tcx>530     fn join_thread_exclusive(
531         &mut self,
532         joined_thread_id: ThreadId,
533         data_race: Option<&mut data_race::GlobalState>,
534     ) -> InterpResult<'tcx> {
535         if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
536             throw_ub_format!("trying to join an already joined thread");
537         }
538 
539         if joined_thread_id == self.active_thread {
540             throw_ub_format!("trying to join itself");
541         }
542 
543         assert!(
544             self.threads
545                 .iter()
546                 .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
547             "this thread already has threads waiting for its termination"
548         );
549 
550         self.join_thread(joined_thread_id, data_race)
551     }
552 
553     /// Set the name of the given thread.
set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>)554     pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
555         self.threads[thread].thread_name = Some(new_thread_name);
556     }
557 
558     /// Get the name of the given thread.
get_thread_name(&self, thread: ThreadId) -> &[u8]559     pub fn get_thread_name(&self, thread: ThreadId) -> &[u8] {
560         self.threads[thread].thread_name()
561     }
562 
563     /// Put the thread into the blocked state.
block_thread(&mut self, thread: ThreadId)564     fn block_thread(&mut self, thread: ThreadId) {
565         let state = &mut self.threads[thread].state;
566         assert_eq!(*state, ThreadState::Enabled);
567         *state = ThreadState::BlockedOnSync;
568     }
569 
570     /// Put the blocked thread into the enabled state.
unblock_thread(&mut self, thread: ThreadId)571     fn unblock_thread(&mut self, thread: ThreadId) {
572         let state = &mut self.threads[thread].state;
573         assert_eq!(*state, ThreadState::BlockedOnSync);
574         *state = ThreadState::Enabled;
575     }
576 
577     /// Change the active thread to some enabled thread.
yield_active_thread(&mut self)578     fn yield_active_thread(&mut self) {
579         // We do not yield immediately, as swapping out the current stack while executing a MIR statement
580         // could lead to all sorts of confusion.
581         // We should only switch stacks between steps.
582         self.yield_active_thread = true;
583     }
584 
585     /// Register the given `callback` to be called once the `call_time` passes.
586     ///
587     /// The callback will be called with `thread` being the active thread, and
588     /// the callback may not change the active thread.
register_timeout_callback( &mut self, thread: ThreadId, call_time: Time, callback: TimeoutCallback<'mir, 'tcx>, )589     fn register_timeout_callback(
590         &mut self,
591         thread: ThreadId,
592         call_time: Time,
593         callback: TimeoutCallback<'mir, 'tcx>,
594     ) {
595         self.timeout_callbacks
596             .try_insert(thread, TimeoutCallbackInfo { call_time, callback })
597             .unwrap();
598     }
599 
600     /// Unregister the callback for the `thread`.
unregister_timeout_callback_if_exists(&mut self, thread: ThreadId)601     fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
602         self.timeout_callbacks.remove(&thread);
603     }
604 
605     /// Get a callback that is ready to be called.
get_ready_callback( &mut self, clock: &Clock, ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)>606     fn get_ready_callback(
607         &mut self,
608         clock: &Clock,
609     ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> {
610         // We iterate over all threads in the order of their indices because
611         // this allows us to have a deterministic scheduler.
612         for thread in self.threads.indices() {
613             match self.timeout_callbacks.entry(thread) {
614                 Entry::Occupied(entry) => {
615                     if entry.get().call_time.get_wait_time(clock) == Duration::new(0, 0) {
616                         return Some((thread, entry.remove().callback));
617                     }
618                 }
619                 Entry::Vacant(_) => {}
620             }
621         }
622         None
623     }
624 
625     /// Wakes up threads joining on the active one and deallocates thread-local statics.
626     /// The `AllocId` that can now be freed are returned.
thread_terminated( &mut self, mut data_race: Option<&mut data_race::GlobalState>, current_span: Span, ) -> Vec<Pointer<Provenance>>627     fn thread_terminated(
628         &mut self,
629         mut data_race: Option<&mut data_race::GlobalState>,
630         current_span: Span,
631     ) -> Vec<Pointer<Provenance>> {
632         let mut free_tls_statics = Vec::new();
633         {
634             let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut();
635             thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| {
636                 if thread != self.active_thread {
637                     // Keep this static around.
638                     return true;
639                 }
640                 // Delete this static from the map and from memory.
641                 // We cannot free directly here as we cannot use `?` in this context.
642                 free_tls_statics.push(alloc_id);
643                 false
644             });
645         }
646         // Set the thread into a terminated state in the data-race detector.
647         if let Some(ref mut data_race) = data_race {
648             data_race.thread_terminated(self, current_span);
649         }
650         // Check if we need to unblock any threads.
651         let mut joined_threads = vec![]; // store which threads joined, we'll need it
652         for (i, thread) in self.threads.iter_enumerated_mut() {
653             if thread.state == ThreadState::BlockedOnJoin(self.active_thread) {
654                 // The thread has terminated, mark happens-before edge to joining thread
655                 if data_race.is_some() {
656                     joined_threads.push(i);
657                 }
658                 trace!("unblocking {:?} because {:?} terminated", i, self.active_thread);
659                 thread.state = ThreadState::Enabled;
660             }
661         }
662         for &i in &joined_threads {
663             data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread);
664         }
665         free_tls_statics
666     }
667 
668     /// Decide which action to take next and on which thread.
669     ///
670     /// The currently implemented scheduling policy is the one that is commonly
671     /// used in stateless model checkers such as Loom: run the active thread as
672     /// long as we can and switch only when we have to (the active thread was
673     /// blocked, terminated, or has explicitly asked to be preempted).
schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction>674     fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> {
675         // This thread and the program can keep going.
676         if self.threads[self.active_thread].state == ThreadState::Enabled
677             && !self.yield_active_thread
678         {
679             // The currently active thread is still enabled, just continue with it.
680             return Ok(SchedulingAction::ExecuteStep);
681         }
682         // The active thread yielded or got terminated. Let's see if there are any timeouts to take
683         // care of. We do this *before* running any other thread, to ensure that timeouts "in the
684         // past" fire before any other thread can take an action. This ensures that for
685         // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
686         // abstime has already been passed at the time of the call".
687         // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
688         let potential_sleep_time =
689             self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time(clock)).min();
690         if potential_sleep_time == Some(Duration::new(0, 0)) {
691             return Ok(SchedulingAction::ExecuteTimeoutCallback);
692         }
693         // No callbacks immediately scheduled, pick a regular thread to execute.
694         // The active thread blocked or yielded. So we go search for another enabled thread.
695         // Crucially, we start searching at the current active thread ID, rather than at 0, since we
696         // want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2.
697         //
698         // `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after*
699         // the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the
700         // active thread.
701         let threads = self
702             .threads
703             .iter_enumerated()
704             .skip(self.active_thread.index() + 1)
705             .chain(self.threads.iter_enumerated().take(self.active_thread.index()));
706         for (id, thread) in threads {
707             debug_assert_ne!(self.active_thread, id);
708             if thread.state == ThreadState::Enabled {
709                 self.active_thread = id;
710                 break;
711             }
712         }
713         self.yield_active_thread = false;
714         if self.threads[self.active_thread].state == ThreadState::Enabled {
715             return Ok(SchedulingAction::ExecuteStep);
716         }
717         // We have not found a thread to execute.
718         if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
719             unreachable!("all threads terminated without the main thread terminating?!");
720         } else if let Some(sleep_time) = potential_sleep_time {
721             // All threads are currently blocked, but we have unexecuted
722             // timeout_callbacks, which may unblock some of the threads. Hence,
723             // sleep until the first callback.
724             Ok(SchedulingAction::Sleep(sleep_time))
725         } else {
726             throw_machine_stop!(TerminationInfo::Deadlock);
727         }
728     }
729 }
730 
731 impl<'mir, 'tcx: 'mir> EvalContextPrivExt<'mir, 'tcx> for MiriInterpCx<'mir, 'tcx> {}
732 trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
733     /// Execute a timeout callback on the callback's thread.
734     #[inline]
run_timeout_callback(&mut self) -> InterpResult<'tcx>735     fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
736         let this = self.eval_context_mut();
737         let (thread, callback) = if let Some((thread, callback)) =
738             this.machine.threads.get_ready_callback(&this.machine.clock)
739         {
740             (thread, callback)
741         } else {
742             // get_ready_callback can return None if the computer's clock
743             // was shifted after calling the scheduler and before the call
744             // to get_ready_callback (see issue
745             // https://github.com/rust-lang/miri/issues/1763). In this case,
746             // just do nothing, which effectively just returns to the
747             // scheduler.
748             return Ok(());
749         };
750         // This back-and-forth with `set_active_thread` is here because of two
751         // design decisions:
752         // 1. Make the caller and not the callback responsible for changing
753         //    thread.
754         // 2. Make the scheduler the only place that can change the active
755         //    thread.
756         let old_thread = this.set_active_thread(thread);
757         callback.call(this)?;
758         this.set_active_thread(old_thread);
759         Ok(())
760     }
761 
762     #[inline]
run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>>763     fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
764         let this = self.eval_context_mut();
765         let mut callback = this
766             .active_thread_mut()
767             .on_stack_empty
768             .take()
769             .expect("`on_stack_empty` not set up, or already running");
770         let res = callback(this)?;
771         this.active_thread_mut().on_stack_empty = Some(callback);
772         Ok(res)
773     }
774 }
775 
776 // Public interface to thread management.
777 impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
778 pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
779     /// Get a thread-specific allocation id for the given thread-local static.
780     /// If needed, allocate a new one.
get_or_create_thread_local_alloc( &mut self, def_id: DefId, ) -> InterpResult<'tcx, Pointer<Provenance>>781     fn get_or_create_thread_local_alloc(
782         &mut self,
783         def_id: DefId,
784     ) -> InterpResult<'tcx, Pointer<Provenance>> {
785         let this = self.eval_context_mut();
786         let tcx = this.tcx;
787         if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
788             // We already have a thread-specific allocation id for this
789             // thread-local static.
790             Ok(old_alloc)
791         } else {
792             // We need to allocate a thread-specific allocation id for this
793             // thread-local static.
794             // First, we compute the initial value for this static.
795             if tcx.is_foreign_item(def_id) {
796                 throw_unsup_format!("foreign thread-local statics are not supported");
797             }
798             // We don't give a span -- statics don't need that, they cannot be generic or associated.
799             let allocation = this.ctfe_query(None, |tcx| tcx.eval_static_initializer(def_id))?;
800             let mut allocation = allocation.inner().clone();
801             // This allocation will be deallocated when the thread dies, so it is not in read-only memory.
802             allocation.mutability = Mutability::Mut;
803             // Create a fresh allocation with this content.
804             let new_alloc = this.allocate_raw_ptr(allocation, MiriMemoryKind::Tls.into())?;
805             this.machine.threads.set_thread_local_alloc(def_id, new_alloc);
806             Ok(new_alloc)
807         }
808     }
809 
810     /// Start a regular (non-main) thread.
811     #[inline]
start_regular_thread( &mut self, thread: Option<MPlaceTy<'tcx, Provenance>>, start_routine: Pointer<Option<Provenance>>, start_abi: Abi, func_arg: ImmTy<'tcx, Provenance>, ret_layout: TyAndLayout<'tcx>, ) -> InterpResult<'tcx, ThreadId>812     fn start_regular_thread(
813         &mut self,
814         thread: Option<MPlaceTy<'tcx, Provenance>>,
815         start_routine: Pointer<Option<Provenance>>,
816         start_abi: Abi,
817         func_arg: ImmTy<'tcx, Provenance>,
818         ret_layout: TyAndLayout<'tcx>,
819     ) -> InterpResult<'tcx, ThreadId> {
820         let this = self.eval_context_mut();
821 
822         // Create the new thread
823         let new_thread_id = this.machine.threads.create_thread({
824             let mut state = tls::TlsDtorsState::default();
825             Box::new(move |m| state.on_stack_empty(m))
826         });
827         let current_span = this.machine.current_span();
828         if let Some(data_race) = &mut this.machine.data_race {
829             data_race.thread_created(&this.machine.threads, new_thread_id, current_span);
830         }
831 
832         // Write the current thread-id, switch to the next thread later
833         // to treat this write operation as occurring on the current thread.
834         if let Some(thread_info_place) = thread {
835             this.write_scalar(
836                 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
837                 &thread_info_place.into(),
838             )?;
839         }
840 
841         // Finally switch to new thread so that we can push the first stackframe.
842         // After this all accesses will be treated as occurring in the new thread.
843         let old_thread_id = this.set_active_thread(new_thread_id);
844 
845         // Perform the function pointer load in the new thread frame.
846         let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
847 
848         // Note: the returned value is currently ignored (see the FIXME in
849         // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use
850         // it.
851         let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
852 
853         this.call_function(
854             instance,
855             start_abi,
856             &[*func_arg],
857             Some(&ret_place.into()),
858             StackPopCleanup::Root { cleanup: true },
859         )?;
860 
861         // Restore the old active thread frame.
862         this.set_active_thread(old_thread_id);
863 
864         Ok(new_thread_id)
865     }
866 
867     #[inline]
detach_thread( &mut self, thread_id: ThreadId, allow_terminated_joined: bool, ) -> InterpResult<'tcx>868     fn detach_thread(
869         &mut self,
870         thread_id: ThreadId,
871         allow_terminated_joined: bool,
872     ) -> InterpResult<'tcx> {
873         let this = self.eval_context_mut();
874         this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
875     }
876 
877     #[inline]
join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx>878     fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
879         let this = self.eval_context_mut();
880         this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?;
881         Ok(())
882     }
883 
884     #[inline]
join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx>885     fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
886         let this = self.eval_context_mut();
887         this.machine
888             .threads
889             .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?;
890         Ok(())
891     }
892 
893     #[inline]
set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId894     fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId {
895         let this = self.eval_context_mut();
896         this.machine.threads.set_active_thread_id(thread_id)
897     }
898 
899     #[inline]
get_active_thread(&self) -> ThreadId900     fn get_active_thread(&self) -> ThreadId {
901         let this = self.eval_context_ref();
902         this.machine.threads.get_active_thread_id()
903     }
904 
905     #[inline]
active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx>906     fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> {
907         let this = self.eval_context_mut();
908         this.machine.threads.active_thread_mut()
909     }
910 
911     #[inline]
active_thread_ref(&self) -> &Thread<'mir, 'tcx>912     fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> {
913         let this = self.eval_context_ref();
914         this.machine.threads.active_thread_ref()
915     }
916 
917     #[inline]
get_total_thread_count(&self) -> usize918     fn get_total_thread_count(&self) -> usize {
919         let this = self.eval_context_ref();
920         this.machine.threads.get_total_thread_count()
921     }
922 
923     #[inline]
have_all_terminated(&self) -> bool924     fn have_all_terminated(&self) -> bool {
925         let this = self.eval_context_ref();
926         this.machine.threads.have_all_terminated()
927     }
928 
929     #[inline]
enable_thread(&mut self, thread_id: ThreadId)930     fn enable_thread(&mut self, thread_id: ThreadId) {
931         let this = self.eval_context_mut();
932         this.machine.threads.enable_thread(thread_id);
933     }
934 
935     #[inline]
active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]936     fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] {
937         let this = self.eval_context_ref();
938         this.machine.threads.active_thread_stack()
939     }
940 
941     #[inline]
active_thread_stack_mut( &mut self, ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>942     fn active_thread_stack_mut(
943         &mut self,
944     ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> {
945         let this = self.eval_context_mut();
946         this.machine.threads.active_thread_stack_mut()
947     }
948 
949     /// Set the name of the current thread. The buffer must not include the null terminator.
950     #[inline]
set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>)951     fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
952         let this = self.eval_context_mut();
953         this.machine.threads.set_thread_name(thread, new_thread_name);
954     }
955 
956     #[inline]
set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16])957     fn set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16]) {
958         let this = self.eval_context_mut();
959 
960         // The Windows `GetThreadDescription` shim to get the thread name isn't implemented, so being lossy is okay.
961         // This is only read by diagnostics, which already use `from_utf8_lossy`.
962         this.machine
963             .threads
964             .set_thread_name(thread, String::from_utf16_lossy(new_thread_name).into_bytes());
965     }
966 
967     #[inline]
get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8] where 'mir: 'c,968     fn get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8]
969     where
970         'mir: 'c,
971     {
972         self.eval_context_ref().machine.threads.get_thread_name(thread)
973     }
974 
975     #[inline]
block_thread(&mut self, thread: ThreadId)976     fn block_thread(&mut self, thread: ThreadId) {
977         self.eval_context_mut().machine.threads.block_thread(thread);
978     }
979 
980     #[inline]
unblock_thread(&mut self, thread: ThreadId)981     fn unblock_thread(&mut self, thread: ThreadId) {
982         self.eval_context_mut().machine.threads.unblock_thread(thread);
983     }
984 
985     #[inline]
yield_active_thread(&mut self)986     fn yield_active_thread(&mut self) {
987         self.eval_context_mut().machine.threads.yield_active_thread();
988     }
989 
990     #[inline]
maybe_preempt_active_thread(&mut self)991     fn maybe_preempt_active_thread(&mut self) {
992         use rand::Rng as _;
993 
994         let this = self.eval_context_mut();
995         if this.machine.rng.get_mut().gen_bool(this.machine.preemption_rate) {
996             this.yield_active_thread();
997         }
998     }
999 
1000     #[inline]
register_timeout_callback( &mut self, thread: ThreadId, call_time: Time, callback: TimeoutCallback<'mir, 'tcx>, )1001     fn register_timeout_callback(
1002         &mut self,
1003         thread: ThreadId,
1004         call_time: Time,
1005         callback: TimeoutCallback<'mir, 'tcx>,
1006     ) {
1007         let this = self.eval_context_mut();
1008         if !this.machine.communicate() && matches!(call_time, Time::RealTime(..)) {
1009             panic!("cannot have `RealTime` callback with isolation enabled!")
1010         }
1011         this.machine.threads.register_timeout_callback(thread, call_time, callback);
1012     }
1013 
1014     #[inline]
unregister_timeout_callback_if_exists(&mut self, thread: ThreadId)1015     fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
1016         let this = self.eval_context_mut();
1017         this.machine.threads.unregister_timeout_callback_if_exists(thread);
1018     }
1019 
1020     /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
1021     /// termination).
run_threads(&mut self) -> InterpResult<'tcx, !>1022     fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1023         static SIGNALED: AtomicBool = AtomicBool::new(false);
1024         ctrlc::set_handler(move || {
1025             // Indicate that we have ben signaled to stop. If we were already signaled, exit
1026             // immediately. In our interpreter loop we try to consult this value often, but if for
1027             // whatever reason we don't get to that check or the cleanup we do upon finding that
1028             // this bool has become true takes a long time, the exit here will promptly exit the
1029             // process on the second Ctrl-C.
1030             if SIGNALED.swap(true, Relaxed) {
1031                 std::process::exit(1);
1032             }
1033         })
1034         .unwrap();
1035         let this = self.eval_context_mut();
1036         loop {
1037             if SIGNALED.load(Relaxed) {
1038                 this.machine.handle_abnormal_termination();
1039                 std::process::exit(1);
1040             }
1041             match this.machine.threads.schedule(&this.machine.clock)? {
1042                 SchedulingAction::ExecuteStep => {
1043                     if !this.step()? {
1044                         // See if this thread can do something else.
1045                         match this.run_on_stack_empty()? {
1046                             Poll::Pending => {} // keep going
1047                             Poll::Ready(()) => this.terminate_active_thread()?,
1048                         }
1049                     }
1050                 }
1051                 SchedulingAction::ExecuteTimeoutCallback => {
1052                     this.run_timeout_callback()?;
1053                 }
1054                 SchedulingAction::Sleep(duration) => {
1055                     this.machine.clock.sleep(duration);
1056                 }
1057             }
1058         }
1059     }
1060 
1061     /// Handles thread termination of the active thread: wakes up threads joining on this one,
1062     /// and deallocated thread-local statics.
1063     ///
1064     /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
1065     #[inline]
terminate_active_thread(&mut self) -> InterpResult<'tcx>1066     fn terminate_active_thread(&mut self) -> InterpResult<'tcx> {
1067         let this = self.eval_context_mut();
1068         let thread = this.active_thread_mut();
1069         assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
1070         thread.state = ThreadState::Terminated;
1071 
1072         let current_span = this.machine.current_span();
1073         for ptr in
1074             this.machine.threads.thread_terminated(this.machine.data_race.as_mut(), current_span)
1075         {
1076             this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?;
1077         }
1078         Ok(())
1079     }
1080 }
1081