• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Thread module emulating a subset of Java's threading model."""
2
3import os as _os
4import sys as _sys
5import _thread
6import functools
7
8from time import monotonic as _time
9from _weakrefset import WeakSet
10from itertools import islice as _islice, count as _count
11try:
12    from _collections import deque as _deque
13except ImportError:
14    from collections import deque as _deque
15
16# Note regarding PEP 8 compliant names
17#  This threading model was originally inspired by Java, and inherited
18# the convention of camelCase function and method names from that
19# language. Those original names are not in any imminent danger of
20# being deprecated (even for Py3k),so this module provides them as an
21# alias for the PEP 8 compliant names
22# Note that using the new PEP 8 compliant names facilitates substitution
23# with the multiprocessing module, which doesn't provide the old
24# Java inspired names.
25
26__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
27           'enumerate', 'main_thread', 'TIMEOUT_MAX',
28           'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
29           'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
30           'setprofile', 'settrace', 'local', 'stack_size',
31           'excepthook', 'ExceptHookArgs']
32
33# Rename some stuff so "from threading import *" is safe
34_start_new_thread = _thread.start_new_thread
35_allocate_lock = _thread.allocate_lock
36_set_sentinel = _thread._set_sentinel
37get_ident = _thread.get_ident
38try:
39    get_native_id = _thread.get_native_id
40    _HAVE_THREAD_NATIVE_ID = True
41    __all__.append('get_native_id')
42except AttributeError:
43    _HAVE_THREAD_NATIVE_ID = False
44ThreadError = _thread.error
45try:
46    _CRLock = _thread.RLock
47except AttributeError:
48    _CRLock = None
49TIMEOUT_MAX = _thread.TIMEOUT_MAX
50del _thread
51
52
53# Support for profile and trace hooks
54
55_profile_hook = None
56_trace_hook = None
57
58def setprofile(func):
59    """Set a profile function for all threads started from the threading module.
60
61    The func will be passed to sys.setprofile() for each thread, before its
62    run() method is called.
63
64    """
65    global _profile_hook
66    _profile_hook = func
67
68def settrace(func):
69    """Set a trace function for all threads started from the threading module.
70
71    The func will be passed to sys.settrace() for each thread, before its run()
72    method is called.
73
74    """
75    global _trace_hook
76    _trace_hook = func
77
78# Synchronization classes
79
80Lock = _allocate_lock
81
82def RLock(*args, **kwargs):
83    """Factory function that returns a new reentrant lock.
84
85    A reentrant lock must be released by the thread that acquired it. Once a
86    thread has acquired a reentrant lock, the same thread may acquire it again
87    without blocking; the thread must release it once for each time it has
88    acquired it.
89
90    """
91    if _CRLock is None:
92        return _PyRLock(*args, **kwargs)
93    return _CRLock(*args, **kwargs)
94
95class _RLock:
96    """This class implements reentrant lock objects.
97
98    A reentrant lock must be released by the thread that acquired it. Once a
99    thread has acquired a reentrant lock, the same thread may acquire it
100    again without blocking; the thread must release it once for each time it
101    has acquired it.
102
103    """
104
105    def __init__(self):
106        self._block = _allocate_lock()
107        self._owner = None
108        self._count = 0
109
110    def __repr__(self):
111        owner = self._owner
112        try:
113            owner = _active[owner].name
114        except KeyError:
115            pass
116        return "<%s %s.%s object owner=%r count=%d at %s>" % (
117            "locked" if self._block.locked() else "unlocked",
118            self.__class__.__module__,
119            self.__class__.__qualname__,
120            owner,
121            self._count,
122            hex(id(self))
123        )
124
125    def _at_fork_reinit(self):
126        self._block._at_fork_reinit()
127        self._owner = None
128        self._count = 0
129
130    def acquire(self, blocking=True, timeout=-1):
131        """Acquire a lock, blocking or non-blocking.
132
133        When invoked without arguments: if this thread already owns the lock,
134        increment the recursion level by one, and return immediately. Otherwise,
135        if another thread owns the lock, block until the lock is unlocked. Once
136        the lock is unlocked (not owned by any thread), then grab ownership, set
137        the recursion level to one, and return. If more than one thread is
138        blocked waiting until the lock is unlocked, only one at a time will be
139        able to grab ownership of the lock. There is no return value in this
140        case.
141
142        When invoked with the blocking argument set to true, do the same thing
143        as when called without arguments, and return true.
144
145        When invoked with the blocking argument set to false, do not block. If a
146        call without an argument would block, return false immediately;
147        otherwise, do the same thing as when called without arguments, and
148        return true.
149
150        When invoked with the floating-point timeout argument set to a positive
151        value, block for at most the number of seconds specified by timeout
152        and as long as the lock cannot be acquired.  Return true if the lock has
153        been acquired, false if the timeout has elapsed.
154
155        """
156        me = get_ident()
157        if self._owner == me:
158            self._count += 1
159            return 1
160        rc = self._block.acquire(blocking, timeout)
161        if rc:
162            self._owner = me
163            self._count = 1
164        return rc
165
166    __enter__ = acquire
167
168    def release(self):
169        """Release a lock, decrementing the recursion level.
170
171        If after the decrement it is zero, reset the lock to unlocked (not owned
172        by any thread), and if any other threads are blocked waiting for the
173        lock to become unlocked, allow exactly one of them to proceed. If after
174        the decrement the recursion level is still nonzero, the lock remains
175        locked and owned by the calling thread.
176
177        Only call this method when the calling thread owns the lock. A
178        RuntimeError is raised if this method is called when the lock is
179        unlocked.
180
181        There is no return value.
182
183        """
184        if self._owner != get_ident():
185            raise RuntimeError("cannot release un-acquired lock")
186        self._count = count = self._count - 1
187        if not count:
188            self._owner = None
189            self._block.release()
190
191    def __exit__(self, t, v, tb):
192        self.release()
193
194    # Internal methods used by condition variables
195
196    def _acquire_restore(self, state):
197        self._block.acquire()
198        self._count, self._owner = state
199
200    def _release_save(self):
201        if self._count == 0:
202            raise RuntimeError("cannot release un-acquired lock")
203        count = self._count
204        self._count = 0
205        owner = self._owner
206        self._owner = None
207        self._block.release()
208        return (count, owner)
209
210    def _is_owned(self):
211        return self._owner == get_ident()
212
213_PyRLock = _RLock
214
215
216class Condition:
217    """Class that implements a condition variable.
218
219    A condition variable allows one or more threads to wait until they are
220    notified by another thread.
221
222    If the lock argument is given and not None, it must be a Lock or RLock
223    object, and it is used as the underlying lock. Otherwise, a new RLock object
224    is created and used as the underlying lock.
225
226    """
227
228    def __init__(self, lock=None):
229        if lock is None:
230            lock = RLock()
231        self._lock = lock
232        # Export the lock's acquire() and release() methods
233        self.acquire = lock.acquire
234        self.release = lock.release
235        # If the lock defines _release_save() and/or _acquire_restore(),
236        # these override the default implementations (which just call
237        # release() and acquire() on the lock).  Ditto for _is_owned().
238        try:
239            self._release_save = lock._release_save
240        except AttributeError:
241            pass
242        try:
243            self._acquire_restore = lock._acquire_restore
244        except AttributeError:
245            pass
246        try:
247            self._is_owned = lock._is_owned
248        except AttributeError:
249            pass
250        self._waiters = _deque()
251
252    def _at_fork_reinit(self):
253        self._lock._at_fork_reinit()
254        self._waiters.clear()
255
256    def __enter__(self):
257        return self._lock.__enter__()
258
259    def __exit__(self, *args):
260        return self._lock.__exit__(*args)
261
262    def __repr__(self):
263        return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
264
265    def _release_save(self):
266        self._lock.release()           # No state to save
267
268    def _acquire_restore(self, x):
269        self._lock.acquire()           # Ignore saved state
270
271    def _is_owned(self):
272        # Return True if lock is owned by current_thread.
273        # This method is called only if _lock doesn't have _is_owned().
274        if self._lock.acquire(False):
275            self._lock.release()
276            return False
277        else:
278            return True
279
280    def wait(self, timeout=None):
281        """Wait until notified or until a timeout occurs.
282
283        If the calling thread has not acquired the lock when this method is
284        called, a RuntimeError is raised.
285
286        This method releases the underlying lock, and then blocks until it is
287        awakened by a notify() or notify_all() call for the same condition
288        variable in another thread, or until the optional timeout occurs. Once
289        awakened or timed out, it re-acquires the lock and returns.
290
291        When the timeout argument is present and not None, it should be a
292        floating point number specifying a timeout for the operation in seconds
293        (or fractions thereof).
294
295        When the underlying lock is an RLock, it is not released using its
296        release() method, since this may not actually unlock the lock when it
297        was acquired multiple times recursively. Instead, an internal interface
298        of the RLock class is used, which really unlocks it even when it has
299        been recursively acquired several times. Another internal interface is
300        then used to restore the recursion level when the lock is reacquired.
301
302        """
303        if not self._is_owned():
304            raise RuntimeError("cannot wait on un-acquired lock")
305        waiter = _allocate_lock()
306        waiter.acquire()
307        self._waiters.append(waiter)
308        saved_state = self._release_save()
309        gotit = False
310        try:    # restore state no matter what (e.g., KeyboardInterrupt)
311            if timeout is None:
312                waiter.acquire()
313                gotit = True
314            else:
315                if timeout > 0:
316                    gotit = waiter.acquire(True, timeout)
317                else:
318                    gotit = waiter.acquire(False)
319            return gotit
320        finally:
321            self._acquire_restore(saved_state)
322            if not gotit:
323                try:
324                    self._waiters.remove(waiter)
325                except ValueError:
326                    pass
327
328    def wait_for(self, predicate, timeout=None):
329        """Wait until a condition evaluates to True.
330
331        predicate should be a callable which result will be interpreted as a
332        boolean value.  A timeout may be provided giving the maximum time to
333        wait.
334
335        """
336        endtime = None
337        waittime = timeout
338        result = predicate()
339        while not result:
340            if waittime is not None:
341                if endtime is None:
342                    endtime = _time() + waittime
343                else:
344                    waittime = endtime - _time()
345                    if waittime <= 0:
346                        break
347            self.wait(waittime)
348            result = predicate()
349        return result
350
351    def notify(self, n=1):
352        """Wake up one or more threads waiting on this condition, if any.
353
354        If the calling thread has not acquired the lock when this method is
355        called, a RuntimeError is raised.
356
357        This method wakes up at most n of the threads waiting for the condition
358        variable; it is a no-op if no threads are waiting.
359
360        """
361        if not self._is_owned():
362            raise RuntimeError("cannot notify on un-acquired lock")
363        all_waiters = self._waiters
364        waiters_to_notify = _deque(_islice(all_waiters, n))
365        if not waiters_to_notify:
366            return
367        for waiter in waiters_to_notify:
368            waiter.release()
369            try:
370                all_waiters.remove(waiter)
371            except ValueError:
372                pass
373
374    def notify_all(self):
375        """Wake up all threads waiting on this condition.
376
377        If the calling thread has not acquired the lock when this method
378        is called, a RuntimeError is raised.
379
380        """
381        self.notify(len(self._waiters))
382
383    notifyAll = notify_all
384
385
386class Semaphore:
387    """This class implements semaphore objects.
388
389    Semaphores manage a counter representing the number of release() calls minus
390    the number of acquire() calls, plus an initial value. The acquire() method
391    blocks if necessary until it can return without making the counter
392    negative. If not given, value defaults to 1.
393
394    """
395
396    # After Tim Peters' semaphore class, but not quite the same (no maximum)
397
398    def __init__(self, value=1):
399        if value < 0:
400            raise ValueError("semaphore initial value must be >= 0")
401        self._cond = Condition(Lock())
402        self._value = value
403
404    def acquire(self, blocking=True, timeout=None):
405        """Acquire a semaphore, decrementing the internal counter by one.
406
407        When invoked without arguments: if the internal counter is larger than
408        zero on entry, decrement it by one and return immediately. If it is zero
409        on entry, block, waiting until some other thread has called release() to
410        make it larger than zero. This is done with proper interlocking so that
411        if multiple acquire() calls are blocked, release() will wake exactly one
412        of them up. The implementation may pick one at random, so the order in
413        which blocked threads are awakened should not be relied on. There is no
414        return value in this case.
415
416        When invoked with blocking set to true, do the same thing as when called
417        without arguments, and return true.
418
419        When invoked with blocking set to false, do not block. If a call without
420        an argument would block, return false immediately; otherwise, do the
421        same thing as when called without arguments, and return true.
422
423        When invoked with a timeout other than None, it will block for at
424        most timeout seconds.  If acquire does not complete successfully in
425        that interval, return false.  Return true otherwise.
426
427        """
428        if not blocking and timeout is not None:
429            raise ValueError("can't specify timeout for non-blocking acquire")
430        rc = False
431        endtime = None
432        with self._cond:
433            while self._value == 0:
434                if not blocking:
435                    break
436                if timeout is not None:
437                    if endtime is None:
438                        endtime = _time() + timeout
439                    else:
440                        timeout = endtime - _time()
441                        if timeout <= 0:
442                            break
443                self._cond.wait(timeout)
444            else:
445                self._value -= 1
446                rc = True
447        return rc
448
449    __enter__ = acquire
450
451    def release(self, n=1):
452        """Release a semaphore, incrementing the internal counter by one or more.
453
454        When the counter is zero on entry and another thread is waiting for it
455        to become larger than zero again, wake up that thread.
456
457        """
458        if n < 1:
459            raise ValueError('n must be one or more')
460        with self._cond:
461            self._value += n
462            for i in range(n):
463                self._cond.notify()
464
465    def __exit__(self, t, v, tb):
466        self.release()
467
468
469class BoundedSemaphore(Semaphore):
470    """Implements a bounded semaphore.
471
472    A bounded semaphore checks to make sure its current value doesn't exceed its
473    initial value. If it does, ValueError is raised. In most situations
474    semaphores are used to guard resources with limited capacity.
475
476    If the semaphore is released too many times it's a sign of a bug. If not
477    given, value defaults to 1.
478
479    Like regular semaphores, bounded semaphores manage a counter representing
480    the number of release() calls minus the number of acquire() calls, plus an
481    initial value. The acquire() method blocks if necessary until it can return
482    without making the counter negative. If not given, value defaults to 1.
483
484    """
485
486    def __init__(self, value=1):
487        Semaphore.__init__(self, value)
488        self._initial_value = value
489
490    def release(self, n=1):
491        """Release a semaphore, incrementing the internal counter by one or more.
492
493        When the counter is zero on entry and another thread is waiting for it
494        to become larger than zero again, wake up that thread.
495
496        If the number of releases exceeds the number of acquires,
497        raise a ValueError.
498
499        """
500        if n < 1:
501            raise ValueError('n must be one or more')
502        with self._cond:
503            if self._value + n > self._initial_value:
504                raise ValueError("Semaphore released too many times")
505            self._value += n
506            for i in range(n):
507                self._cond.notify()
508
509
510class Event:
511    """Class implementing event objects.
512
513    Events manage a flag that can be set to true with the set() method and reset
514    to false with the clear() method. The wait() method blocks until the flag is
515    true.  The flag is initially false.
516
517    """
518
519    # After Tim Peters' event class (without is_posted())
520
521    def __init__(self):
522        self._cond = Condition(Lock())
523        self._flag = False
524
525    def _at_fork_reinit(self):
526        # Private method called by Thread._reset_internal_locks()
527        self._cond._at_fork_reinit()
528
529    def is_set(self):
530        """Return true if and only if the internal flag is true."""
531        return self._flag
532
533    isSet = is_set
534
535    def set(self):
536        """Set the internal flag to true.
537
538        All threads waiting for it to become true are awakened. Threads
539        that call wait() once the flag is true will not block at all.
540
541        """
542        with self._cond:
543            self._flag = True
544            self._cond.notify_all()
545
546    def clear(self):
547        """Reset the internal flag to false.
548
549        Subsequently, threads calling wait() will block until set() is called to
550        set the internal flag to true again.
551
552        """
553        with self._cond:
554            self._flag = False
555
556    def wait(self, timeout=None):
557        """Block until the internal flag is true.
558
559        If the internal flag is true on entry, return immediately. Otherwise,
560        block until another thread calls set() to set the flag to true, or until
561        the optional timeout occurs.
562
563        When the timeout argument is present and not None, it should be a
564        floating point number specifying a timeout for the operation in seconds
565        (or fractions thereof).
566
567        This method returns the internal flag on exit, so it will always return
568        True except if a timeout is given and the operation times out.
569
570        """
571        with self._cond:
572            signaled = self._flag
573            if not signaled:
574                signaled = self._cond.wait(timeout)
575            return signaled
576
577
578# A barrier class.  Inspired in part by the pthread_barrier_* api and
579# the CyclicBarrier class from Java.  See
580# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
581# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
582#        CyclicBarrier.html
583# for information.
584# We maintain two main states, 'filling' and 'draining' enabling the barrier
585# to be cyclic.  Threads are not allowed into it until it has fully drained
586# since the previous cycle.  In addition, a 'resetting' state exists which is
587# similar to 'draining' except that threads leave with a BrokenBarrierError,
588# and a 'broken' state in which all threads get the exception.
589class Barrier:
590    """Implements a Barrier.
591
592    Useful for synchronizing a fixed number of threads at known synchronization
593    points.  Threads block on 'wait()' and are simultaneously awoken once they
594    have all made that call.
595
596    """
597
598    def __init__(self, parties, action=None, timeout=None):
599        """Create a barrier, initialised to 'parties' threads.
600
601        'action' is a callable which, when supplied, will be called by one of
602        the threads after they have all entered the barrier and just prior to
603        releasing them all. If a 'timeout' is provided, it is used as the
604        default for all subsequent 'wait()' calls.
605
606        """
607        self._cond = Condition(Lock())
608        self._action = action
609        self._timeout = timeout
610        self._parties = parties
611        self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
612        self._count = 0
613
614    def wait(self, timeout=None):
615        """Wait for the barrier.
616
617        When the specified number of threads have started waiting, they are all
618        simultaneously awoken. If an 'action' was provided for the barrier, one
619        of the threads will have executed that callback prior to returning.
620        Returns an individual index number from 0 to 'parties-1'.
621
622        """
623        if timeout is None:
624            timeout = self._timeout
625        with self._cond:
626            self._enter() # Block while the barrier drains.
627            index = self._count
628            self._count += 1
629            try:
630                if index + 1 == self._parties:
631                    # We release the barrier
632                    self._release()
633                else:
634                    # We wait until someone releases us
635                    self._wait(timeout)
636                return index
637            finally:
638                self._count -= 1
639                # Wake up any threads waiting for barrier to drain.
640                self._exit()
641
642    # Block until the barrier is ready for us, or raise an exception
643    # if it is broken.
644    def _enter(self):
645        while self._state in (-1, 1):
646            # It is draining or resetting, wait until done
647            self._cond.wait()
648        #see if the barrier is in a broken state
649        if self._state < 0:
650            raise BrokenBarrierError
651        assert self._state == 0
652
653    # Optionally run the 'action' and release the threads waiting
654    # in the barrier.
655    def _release(self):
656        try:
657            if self._action:
658                self._action()
659            # enter draining state
660            self._state = 1
661            self._cond.notify_all()
662        except:
663            #an exception during the _action handler.  Break and reraise
664            self._break()
665            raise
666
667    # Wait in the barrier until we are released.  Raise an exception
668    # if the barrier is reset or broken.
669    def _wait(self, timeout):
670        if not self._cond.wait_for(lambda : self._state != 0, timeout):
671            #timed out.  Break the barrier
672            self._break()
673            raise BrokenBarrierError
674        if self._state < 0:
675            raise BrokenBarrierError
676        assert self._state == 1
677
678    # If we are the last thread to exit the barrier, signal any threads
679    # waiting for the barrier to drain.
680    def _exit(self):
681        if self._count == 0:
682            if self._state in (-1, 1):
683                #resetting or draining
684                self._state = 0
685                self._cond.notify_all()
686
687    def reset(self):
688        """Reset the barrier to the initial state.
689
690        Any threads currently waiting will get the BrokenBarrier exception
691        raised.
692
693        """
694        with self._cond:
695            if self._count > 0:
696                if self._state == 0:
697                    #reset the barrier, waking up threads
698                    self._state = -1
699                elif self._state == -2:
700                    #was broken, set it to reset state
701                    #which clears when the last thread exits
702                    self._state = -1
703            else:
704                self._state = 0
705            self._cond.notify_all()
706
707    def abort(self):
708        """Place the barrier into a 'broken' state.
709
710        Useful in case of error.  Any currently waiting threads and threads
711        attempting to 'wait()' will have BrokenBarrierError raised.
712
713        """
714        with self._cond:
715            self._break()
716
717    def _break(self):
718        # An internal error was detected.  The barrier is set to
719        # a broken state all parties awakened.
720        self._state = -2
721        self._cond.notify_all()
722
723    @property
724    def parties(self):
725        """Return the number of threads required to trip the barrier."""
726        return self._parties
727
728    @property
729    def n_waiting(self):
730        """Return the number of threads currently waiting at the barrier."""
731        # We don't need synchronization here since this is an ephemeral result
732        # anyway.  It returns the correct value in the steady state.
733        if self._state == 0:
734            return self._count
735        return 0
736
737    @property
738    def broken(self):
739        """Return True if the barrier is in a broken state."""
740        return self._state == -2
741
742# exception raised by the Barrier class
743class BrokenBarrierError(RuntimeError):
744    pass
745
746
747# Helper to generate new thread names
748_counter = _count().__next__
749_counter() # Consume 0 so first non-main thread has id 1.
750def _newname(template="Thread-%d"):
751    return template % _counter()
752
753# Active thread administration
754_active_limbo_lock = _allocate_lock()
755_active = {}    # maps thread id to Thread object
756_limbo = {}
757_dangling = WeakSet()
758# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
759# to wait until all Python thread states get deleted:
760# see Thread._set_tstate_lock().
761_shutdown_locks_lock = _allocate_lock()
762_shutdown_locks = set()
763
764# Main class for threads
765
766class Thread:
767    """A class that represents a thread of control.
768
769    This class can be safely subclassed in a limited fashion. There are two ways
770    to specify the activity: by passing a callable object to the constructor, or
771    by overriding the run() method in a subclass.
772
773    """
774
775    _initialized = False
776
777    def __init__(self, group=None, target=None, name=None,
778                 args=(), kwargs=None, *, daemon=None):
779        """This constructor should always be called with keyword arguments. Arguments are:
780
781        *group* should be None; reserved for future extension when a ThreadGroup
782        class is implemented.
783
784        *target* is the callable object to be invoked by the run()
785        method. Defaults to None, meaning nothing is called.
786
787        *name* is the thread name. By default, a unique name is constructed of
788        the form "Thread-N" where N is a small decimal number.
789
790        *args* is the argument tuple for the target invocation. Defaults to ().
791
792        *kwargs* is a dictionary of keyword arguments for the target
793        invocation. Defaults to {}.
794
795        If a subclass overrides the constructor, it must make sure to invoke
796        the base class constructor (Thread.__init__()) before doing anything
797        else to the thread.
798
799        """
800        assert group is None, "group argument must be None for now"
801        if kwargs is None:
802            kwargs = {}
803        self._target = target
804        self._name = str(name or _newname())
805        self._args = args
806        self._kwargs = kwargs
807        if daemon is not None:
808            self._daemonic = daemon
809        else:
810            self._daemonic = current_thread().daemon
811        self._ident = None
812        if _HAVE_THREAD_NATIVE_ID:
813            self._native_id = None
814        self._tstate_lock = None
815        self._started = Event()
816        self._is_stopped = False
817        self._initialized = True
818        # Copy of sys.stderr used by self._invoke_excepthook()
819        self._stderr = _sys.stderr
820        self._invoke_excepthook = _make_invoke_excepthook()
821        # For debugging and _after_fork()
822        _dangling.add(self)
823
824    def _reset_internal_locks(self, is_alive):
825        # private!  Called by _after_fork() to reset our internal locks as
826        # they may be in an invalid state leading to a deadlock or crash.
827        self._started._at_fork_reinit()
828        if is_alive:
829            # bpo-42350: If the fork happens when the thread is already stopped
830            # (ex: after threading._shutdown() has been called), _tstate_lock
831            # is None. Do nothing in this case.
832            if self._tstate_lock is not None:
833                self._tstate_lock._at_fork_reinit()
834                self._tstate_lock.acquire()
835        else:
836            # The thread isn't alive after fork: it doesn't have a tstate
837            # anymore.
838            self._is_stopped = True
839            self._tstate_lock = None
840
841    def __repr__(self):
842        assert self._initialized, "Thread.__init__() was not called"
843        status = "initial"
844        if self._started.is_set():
845            status = "started"
846        self.is_alive() # easy way to get ._is_stopped set when appropriate
847        if self._is_stopped:
848            status = "stopped"
849        if self._daemonic:
850            status += " daemon"
851        if self._ident is not None:
852            status += " %s" % self._ident
853        return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
854
855    def start(self):
856        """Start the thread's activity.
857
858        It must be called at most once per thread object. It arranges for the
859        object's run() method to be invoked in a separate thread of control.
860
861        This method will raise a RuntimeError if called more than once on the
862        same thread object.
863
864        """
865        if not self._initialized:
866            raise RuntimeError("thread.__init__() not called")
867
868        if self._started.is_set():
869            raise RuntimeError("threads can only be started once")
870
871        with _active_limbo_lock:
872            _limbo[self] = self
873        try:
874            _start_new_thread(self._bootstrap, ())
875        except Exception:
876            with _active_limbo_lock:
877                del _limbo[self]
878            raise
879        self._started.wait()
880
881    def run(self):
882        """Method representing the thread's activity.
883
884        You may override this method in a subclass. The standard run() method
885        invokes the callable object passed to the object's constructor as the
886        target argument, if any, with sequential and keyword arguments taken
887        from the args and kwargs arguments, respectively.
888
889        """
890        try:
891            if self._target:
892                self._target(*self._args, **self._kwargs)
893        finally:
894            # Avoid a refcycle if the thread is running a function with
895            # an argument that has a member that points to the thread.
896            del self._target, self._args, self._kwargs
897
898    def _bootstrap(self):
899        # Wrapper around the real bootstrap code that ignores
900        # exceptions during interpreter cleanup.  Those typically
901        # happen when a daemon thread wakes up at an unfortunate
902        # moment, finds the world around it destroyed, and raises some
903        # random exception *** while trying to report the exception in
904        # _bootstrap_inner() below ***.  Those random exceptions
905        # don't help anybody, and they confuse users, so we suppress
906        # them.  We suppress them only when it appears that the world
907        # indeed has already been destroyed, so that exceptions in
908        # _bootstrap_inner() during normal business hours are properly
909        # reported.  Also, we only suppress them for daemonic threads;
910        # if a non-daemonic encounters this, something else is wrong.
911        try:
912            self._bootstrap_inner()
913        except:
914            if self._daemonic and _sys is None:
915                return
916            raise
917
918    def _set_ident(self):
919        self._ident = get_ident()
920
921    if _HAVE_THREAD_NATIVE_ID:
922        def _set_native_id(self):
923            self._native_id = get_native_id()
924
925    def _set_tstate_lock(self):
926        """
927        Set a lock object which will be released by the interpreter when
928        the underlying thread state (see pystate.h) gets deleted.
929        """
930        self._tstate_lock = _set_sentinel()
931        self._tstate_lock.acquire()
932
933        if not self.daemon:
934            with _shutdown_locks_lock:
935                _shutdown_locks.add(self._tstate_lock)
936
937    def _bootstrap_inner(self):
938        try:
939            self._set_ident()
940            self._set_tstate_lock()
941            if _HAVE_THREAD_NATIVE_ID:
942                self._set_native_id()
943            self._started.set()
944            with _active_limbo_lock:
945                _active[self._ident] = self
946                del _limbo[self]
947
948            if _trace_hook:
949                _sys.settrace(_trace_hook)
950            if _profile_hook:
951                _sys.setprofile(_profile_hook)
952
953            try:
954                self.run()
955            except:
956                self._invoke_excepthook(self)
957        finally:
958            with _active_limbo_lock:
959                try:
960                    # We don't call self._delete() because it also
961                    # grabs _active_limbo_lock.
962                    del _active[get_ident()]
963                except:
964                    pass
965
966    def _stop(self):
967        # After calling ._stop(), .is_alive() returns False and .join() returns
968        # immediately.  ._tstate_lock must be released before calling ._stop().
969        #
970        # Normal case:  C code at the end of the thread's life
971        # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
972        # that's detected by our ._wait_for_tstate_lock(), called by .join()
973        # and .is_alive().  Any number of threads _may_ call ._stop()
974        # simultaneously (for example, if multiple threads are blocked in
975        # .join() calls), and they're not serialized.  That's harmless -
976        # they'll just make redundant rebindings of ._is_stopped and
977        # ._tstate_lock.  Obscure:  we rebind ._tstate_lock last so that the
978        # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
979        # (the assert is executed only if ._tstate_lock is None).
980        #
981        # Special case:  _main_thread releases ._tstate_lock via this
982        # module's _shutdown() function.
983        lock = self._tstate_lock
984        if lock is not None:
985            assert not lock.locked()
986        self._is_stopped = True
987        self._tstate_lock = None
988        if not self.daemon:
989            with _shutdown_locks_lock:
990                _shutdown_locks.discard(lock)
991
992    def _delete(self):
993        "Remove current thread from the dict of currently running threads."
994        with _active_limbo_lock:
995            del _active[get_ident()]
996            # There must not be any python code between the previous line
997            # and after the lock is released.  Otherwise a tracing function
998            # could try to acquire the lock again in the same thread, (in
999            # current_thread()), and would block.
1000
1001    def join(self, timeout=None):
1002        """Wait until the thread terminates.
1003
1004        This blocks the calling thread until the thread whose join() method is
1005        called terminates -- either normally or through an unhandled exception
1006        or until the optional timeout occurs.
1007
1008        When the timeout argument is present and not None, it should be a
1009        floating point number specifying a timeout for the operation in seconds
1010        (or fractions thereof). As join() always returns None, you must call
1011        is_alive() after join() to decide whether a timeout happened -- if the
1012        thread is still alive, the join() call timed out.
1013
1014        When the timeout argument is not present or None, the operation will
1015        block until the thread terminates.
1016
1017        A thread can be join()ed many times.
1018
1019        join() raises a RuntimeError if an attempt is made to join the current
1020        thread as that would cause a deadlock. It is also an error to join() a
1021        thread before it has been started and attempts to do so raises the same
1022        exception.
1023
1024        """
1025        if not self._initialized:
1026            raise RuntimeError("Thread.__init__() not called")
1027        if not self._started.is_set():
1028            raise RuntimeError("cannot join thread before it is started")
1029        if self is current_thread():
1030            raise RuntimeError("cannot join current thread")
1031
1032        if timeout is None:
1033            self._wait_for_tstate_lock()
1034        else:
1035            # the behavior of a negative timeout isn't documented, but
1036            # historically .join(timeout=x) for x<0 has acted as if timeout=0
1037            self._wait_for_tstate_lock(timeout=max(timeout, 0))
1038
1039    def _wait_for_tstate_lock(self, block=True, timeout=-1):
1040        # Issue #18808: wait for the thread state to be gone.
1041        # At the end of the thread's life, after all knowledge of the thread
1042        # is removed from C data structures, C code releases our _tstate_lock.
1043        # This method passes its arguments to _tstate_lock.acquire().
1044        # If the lock is acquired, the C code is done, and self._stop() is
1045        # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
1046        lock = self._tstate_lock
1047        if lock is None:  # already determined that the C code is done
1048            assert self._is_stopped
1049        elif lock.acquire(block, timeout):
1050            lock.release()
1051            self._stop()
1052
1053    @property
1054    def name(self):
1055        """A string used for identification purposes only.
1056
1057        It has no semantics. Multiple threads may be given the same name. The
1058        initial name is set by the constructor.
1059
1060        """
1061        assert self._initialized, "Thread.__init__() not called"
1062        return self._name
1063
1064    @name.setter
1065    def name(self, name):
1066        assert self._initialized, "Thread.__init__() not called"
1067        self._name = str(name)
1068
1069    @property
1070    def ident(self):
1071        """Thread identifier of this thread or None if it has not been started.
1072
1073        This is a nonzero integer. See the get_ident() function. Thread
1074        identifiers may be recycled when a thread exits and another thread is
1075        created. The identifier is available even after the thread has exited.
1076
1077        """
1078        assert self._initialized, "Thread.__init__() not called"
1079        return self._ident
1080
1081    if _HAVE_THREAD_NATIVE_ID:
1082        @property
1083        def native_id(self):
1084            """Native integral thread ID of this thread, or None if it has not been started.
1085
1086            This is a non-negative integer. See the get_native_id() function.
1087            This represents the Thread ID as reported by the kernel.
1088
1089            """
1090            assert self._initialized, "Thread.__init__() not called"
1091            return self._native_id
1092
1093    def is_alive(self):
1094        """Return whether the thread is alive.
1095
1096        This method returns True just before the run() method starts until just
1097        after the run() method terminates. The module function enumerate()
1098        returns a list of all alive threads.
1099
1100        """
1101        assert self._initialized, "Thread.__init__() not called"
1102        if self._is_stopped or not self._started.is_set():
1103            return False
1104        self._wait_for_tstate_lock(False)
1105        return not self._is_stopped
1106
1107    @property
1108    def daemon(self):
1109        """A boolean value indicating whether this thread is a daemon thread.
1110
1111        This must be set before start() is called, otherwise RuntimeError is
1112        raised. Its initial value is inherited from the creating thread; the
1113        main thread is not a daemon thread and therefore all threads created in
1114        the main thread default to daemon = False.
1115
1116        The entire Python program exits when only daemon threads are left.
1117
1118        """
1119        assert self._initialized, "Thread.__init__() not called"
1120        return self._daemonic
1121
1122    @daemon.setter
1123    def daemon(self, daemonic):
1124        if not self._initialized:
1125            raise RuntimeError("Thread.__init__() not called")
1126        if self._started.is_set():
1127            raise RuntimeError("cannot set daemon status of active thread")
1128        self._daemonic = daemonic
1129
1130    def isDaemon(self):
1131        return self.daemon
1132
1133    def setDaemon(self, daemonic):
1134        self.daemon = daemonic
1135
1136    def getName(self):
1137        return self.name
1138
1139    def setName(self, name):
1140        self.name = name
1141
1142
1143try:
1144    from _thread import (_excepthook as excepthook,
1145                         _ExceptHookArgs as ExceptHookArgs)
1146except ImportError:
1147    # Simple Python implementation if _thread._excepthook() is not available
1148    from traceback import print_exception as _print_exception
1149    from collections import namedtuple
1150
1151    _ExceptHookArgs = namedtuple(
1152        'ExceptHookArgs',
1153        'exc_type exc_value exc_traceback thread')
1154
1155    def ExceptHookArgs(args):
1156        return _ExceptHookArgs(*args)
1157
1158    def excepthook(args, /):
1159        """
1160        Handle uncaught Thread.run() exception.
1161        """
1162        if args.exc_type == SystemExit:
1163            # silently ignore SystemExit
1164            return
1165
1166        if _sys is not None and _sys.stderr is not None:
1167            stderr = _sys.stderr
1168        elif args.thread is not None:
1169            stderr = args.thread._stderr
1170            if stderr is None:
1171                # do nothing if sys.stderr is None and sys.stderr was None
1172                # when the thread was created
1173                return
1174        else:
1175            # do nothing if sys.stderr is None and args.thread is None
1176            return
1177
1178        if args.thread is not None:
1179            name = args.thread.name
1180        else:
1181            name = get_ident()
1182        print(f"Exception in thread {name}:",
1183              file=stderr, flush=True)
1184        _print_exception(args.exc_type, args.exc_value, args.exc_traceback,
1185                         file=stderr)
1186        stderr.flush()
1187
1188
1189def _make_invoke_excepthook():
1190    # Create a local namespace to ensure that variables remain alive
1191    # when _invoke_excepthook() is called, even if it is called late during
1192    # Python shutdown. It is mostly needed for daemon threads.
1193
1194    old_excepthook = excepthook
1195    old_sys_excepthook = _sys.excepthook
1196    if old_excepthook is None:
1197        raise RuntimeError("threading.excepthook is None")
1198    if old_sys_excepthook is None:
1199        raise RuntimeError("sys.excepthook is None")
1200
1201    sys_exc_info = _sys.exc_info
1202    local_print = print
1203    local_sys = _sys
1204
1205    def invoke_excepthook(thread):
1206        global excepthook
1207        try:
1208            hook = excepthook
1209            if hook is None:
1210                hook = old_excepthook
1211
1212            args = ExceptHookArgs([*sys_exc_info(), thread])
1213
1214            hook(args)
1215        except Exception as exc:
1216            exc.__suppress_context__ = True
1217            del exc
1218
1219            if local_sys is not None and local_sys.stderr is not None:
1220                stderr = local_sys.stderr
1221            else:
1222                stderr = thread._stderr
1223
1224            local_print("Exception in threading.excepthook:",
1225                        file=stderr, flush=True)
1226
1227            if local_sys is not None and local_sys.excepthook is not None:
1228                sys_excepthook = local_sys.excepthook
1229            else:
1230                sys_excepthook = old_sys_excepthook
1231
1232            sys_excepthook(*sys_exc_info())
1233        finally:
1234            # Break reference cycle (exception stored in a variable)
1235            args = None
1236
1237    return invoke_excepthook
1238
1239
1240# The timer class was contributed by Itamar Shtull-Trauring
1241
1242class Timer(Thread):
1243    """Call a function after a specified number of seconds:
1244
1245            t = Timer(30.0, f, args=None, kwargs=None)
1246            t.start()
1247            t.cancel()     # stop the timer's action if it's still waiting
1248
1249    """
1250
1251    def __init__(self, interval, function, args=None, kwargs=None):
1252        Thread.__init__(self)
1253        self.interval = interval
1254        self.function = function
1255        self.args = args if args is not None else []
1256        self.kwargs = kwargs if kwargs is not None else {}
1257        self.finished = Event()
1258
1259    def cancel(self):
1260        """Stop the timer if it hasn't finished yet."""
1261        self.finished.set()
1262
1263    def run(self):
1264        self.finished.wait(self.interval)
1265        if not self.finished.is_set():
1266            self.function(*self.args, **self.kwargs)
1267        self.finished.set()
1268
1269
1270# Special thread class to represent the main thread
1271
1272class _MainThread(Thread):
1273
1274    def __init__(self):
1275        Thread.__init__(self, name="MainThread", daemon=False)
1276        self._set_tstate_lock()
1277        self._started.set()
1278        self._set_ident()
1279        if _HAVE_THREAD_NATIVE_ID:
1280            self._set_native_id()
1281        with _active_limbo_lock:
1282            _active[self._ident] = self
1283
1284
1285# Dummy thread class to represent threads not started here.
1286# These aren't garbage collected when they die, nor can they be waited for.
1287# If they invoke anything in threading.py that calls current_thread(), they
1288# leave an entry in the _active dict forever after.
1289# Their purpose is to return *something* from current_thread().
1290# They are marked as daemon threads so we won't wait for them
1291# when we exit (conform previous semantics).
1292
1293class _DummyThread(Thread):
1294
1295    def __init__(self):
1296        Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
1297
1298        self._started.set()
1299        self._set_ident()
1300        if _HAVE_THREAD_NATIVE_ID:
1301            self._set_native_id()
1302        with _active_limbo_lock:
1303            _active[self._ident] = self
1304
1305    def _stop(self):
1306        pass
1307
1308    def is_alive(self):
1309        assert not self._is_stopped and self._started.is_set()
1310        return True
1311
1312    def join(self, timeout=None):
1313        assert False, "cannot join a dummy thread"
1314
1315
1316# Global API functions
1317
1318def current_thread():
1319    """Return the current Thread object, corresponding to the caller's thread of control.
1320
1321    If the caller's thread of control was not created through the threading
1322    module, a dummy thread object with limited functionality is returned.
1323
1324    """
1325    try:
1326        return _active[get_ident()]
1327    except KeyError:
1328        return _DummyThread()
1329
1330currentThread = current_thread
1331
1332def active_count():
1333    """Return the number of Thread objects currently alive.
1334
1335    The returned count is equal to the length of the list returned by
1336    enumerate().
1337
1338    """
1339    with _active_limbo_lock:
1340        return len(_active) + len(_limbo)
1341
1342activeCount = active_count
1343
1344def _enumerate():
1345    # Same as enumerate(), but without the lock. Internal use only.
1346    return list(_active.values()) + list(_limbo.values())
1347
1348def enumerate():
1349    """Return a list of all Thread objects currently alive.
1350
1351    The list includes daemonic threads, dummy thread objects created by
1352    current_thread(), and the main thread. It excludes terminated threads and
1353    threads that have not yet been started.
1354
1355    """
1356    with _active_limbo_lock:
1357        return list(_active.values()) + list(_limbo.values())
1358
1359
1360_threading_atexits = []
1361_SHUTTING_DOWN = False
1362
1363def _register_atexit(func, *arg, **kwargs):
1364    """CPython internal: register *func* to be called before joining threads.
1365
1366    The registered *func* is called with its arguments just before all
1367    non-daemon threads are joined in `_shutdown()`. It provides a similar
1368    purpose to `atexit.register()`, but its functions are called prior to
1369    threading shutdown instead of interpreter shutdown.
1370
1371    For similarity to atexit, the registered functions are called in reverse.
1372    """
1373    if _SHUTTING_DOWN:
1374        raise RuntimeError("can't register atexit after shutdown")
1375
1376    call = functools.partial(func, *arg, **kwargs)
1377    _threading_atexits.append(call)
1378
1379
1380from _thread import stack_size
1381
1382# Create the main thread object,
1383# and make it available for the interpreter
1384# (Py_Main) as threading._shutdown.
1385
1386_main_thread = _MainThread()
1387
1388def _shutdown():
1389    """
1390    Wait until the Python thread state of all non-daemon threads get deleted.
1391    """
1392    # Obscure:  other threads may be waiting to join _main_thread.  That's
1393    # dubious, but some code does it.  We can't wait for C code to release
1394    # the main thread's tstate_lock - that won't happen until the interpreter
1395    # is nearly dead.  So we release it here.  Note that just calling _stop()
1396    # isn't enough:  other threads may already be waiting on _tstate_lock.
1397    if _main_thread._is_stopped:
1398        # _shutdown() was already called
1399        return
1400
1401    global _SHUTTING_DOWN
1402    _SHUTTING_DOWN = True
1403    # Main thread
1404    tlock = _main_thread._tstate_lock
1405    # The main thread isn't finished yet, so its thread state lock can't have
1406    # been released.
1407    assert tlock is not None
1408    assert tlock.locked()
1409    tlock.release()
1410    _main_thread._stop()
1411
1412    # Call registered threading atexit functions before threads are joined.
1413    # Order is reversed, similar to atexit.
1414    for atexit_call in reversed(_threading_atexits):
1415        atexit_call()
1416
1417    # Join all non-deamon threads
1418    while True:
1419        with _shutdown_locks_lock:
1420            locks = list(_shutdown_locks)
1421            _shutdown_locks.clear()
1422
1423        if not locks:
1424            break
1425
1426        for lock in locks:
1427            # mimick Thread.join()
1428            lock.acquire()
1429            lock.release()
1430
1431        # new threads can be spawned while we were waiting for the other
1432        # threads to complete
1433
1434
1435def main_thread():
1436    """Return the main thread object.
1437
1438    In normal conditions, the main thread is the thread from which the
1439    Python interpreter was started.
1440    """
1441    return _main_thread
1442
1443# get thread-local implementation, either from the thread
1444# module, or from the python fallback
1445
1446try:
1447    from _thread import _local as local
1448except ImportError:
1449    from _threading_local import local
1450
1451
1452def _after_fork():
1453    """
1454    Cleanup threading module state that should not exist after a fork.
1455    """
1456    # Reset _active_limbo_lock, in case we forked while the lock was held
1457    # by another (non-forked) thread.  http://bugs.python.org/issue874900
1458    global _active_limbo_lock, _main_thread
1459    global _shutdown_locks_lock, _shutdown_locks
1460    _active_limbo_lock = _allocate_lock()
1461
1462    # fork() only copied the current thread; clear references to others.
1463    new_active = {}
1464
1465    try:
1466        current = _active[get_ident()]
1467    except KeyError:
1468        # fork() was called in a thread which was not spawned
1469        # by threading.Thread. For example, a thread spawned
1470        # by thread.start_new_thread().
1471        current = _MainThread()
1472
1473    _main_thread = current
1474
1475    # reset _shutdown() locks: threads re-register their _tstate_lock below
1476    _shutdown_locks_lock = _allocate_lock()
1477    _shutdown_locks = set()
1478
1479    with _active_limbo_lock:
1480        # Dangling thread instances must still have their locks reset,
1481        # because someone may join() them.
1482        threads = set(_enumerate())
1483        threads.update(_dangling)
1484        for thread in threads:
1485            # Any lock/condition variable may be currently locked or in an
1486            # invalid state, so we reinitialize them.
1487            if thread is current:
1488                # There is only one active thread. We reset the ident to
1489                # its new value since it can have changed.
1490                thread._reset_internal_locks(True)
1491                ident = get_ident()
1492                thread._ident = ident
1493                new_active[ident] = thread
1494            else:
1495                # All the others are already stopped.
1496                thread._reset_internal_locks(False)
1497                thread._stop()
1498
1499        _limbo.clear()
1500        _active.clear()
1501        _active.update(new_active)
1502        assert len(_active) == 1
1503
1504
1505if hasattr(_os, "register_at_fork"):
1506    _os.register_at_fork(after_in_child=_after_fork)
1507