• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Thread module emulating a subset of Java's threading model."""
2
3import os as _os
4import sys as _sys
5import _thread
6
7from time import monotonic as _time
8from traceback import format_exc as _format_exc
9from _weakrefset import WeakSet
10from itertools import islice as _islice, count as _count
11try:
12    from _collections import deque as _deque
13except ImportError:
14    from collections import deque as _deque
15
16# Note regarding PEP 8 compliant names
17#  This threading model was originally inspired by Java, and inherited
18# the convention of camelCase function and method names from that
19# language. Those original names are not in any imminent danger of
20# being deprecated (even for Py3k),so this module provides them as an
21# alias for the PEP 8 compliant names
22# Note that using the new PEP 8 compliant names facilitates substitution
23# with the multiprocessing module, which doesn't provide the old
24# Java inspired names.
25
26__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
27           'enumerate', 'main_thread', 'TIMEOUT_MAX',
28           'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
29           'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
30           'setprofile', 'settrace', 'local', 'stack_size']
31
32# Rename some stuff so "from threading import *" is safe
33_start_new_thread = _thread.start_new_thread
34_allocate_lock = _thread.allocate_lock
35_set_sentinel = _thread._set_sentinel
36get_ident = _thread.get_ident
37ThreadError = _thread.error
38try:
39    _CRLock = _thread.RLock
40except AttributeError:
41    _CRLock = None
42TIMEOUT_MAX = _thread.TIMEOUT_MAX
43del _thread
44
45
46# Support for profile and trace hooks
47
48_profile_hook = None
49_trace_hook = None
50
51def setprofile(func):
52    """Set a profile function for all threads started from the threading module.
53
54    The func will be passed to sys.setprofile() for each thread, before its
55    run() method is called.
56
57    """
58    global _profile_hook
59    _profile_hook = func
60
61def settrace(func):
62    """Set a trace function for all threads started from the threading module.
63
64    The func will be passed to sys.settrace() for each thread, before its run()
65    method is called.
66
67    """
68    global _trace_hook
69    _trace_hook = func
70
71# Synchronization classes
72
73Lock = _allocate_lock
74
75def RLock(*args, **kwargs):
76    """Factory function that returns a new reentrant lock.
77
78    A reentrant lock must be released by the thread that acquired it. Once a
79    thread has acquired a reentrant lock, the same thread may acquire it again
80    without blocking; the thread must release it once for each time it has
81    acquired it.
82
83    """
84    if _CRLock is None:
85        return _PyRLock(*args, **kwargs)
86    return _CRLock(*args, **kwargs)
87
88class _RLock:
89    """This class implements reentrant lock objects.
90
91    A reentrant lock must be released by the thread that acquired it. Once a
92    thread has acquired a reentrant lock, the same thread may acquire it
93    again without blocking; the thread must release it once for each time it
94    has acquired it.
95
96    """
97
98    def __init__(self):
99        self._block = _allocate_lock()
100        self._owner = None
101        self._count = 0
102
103    def __repr__(self):
104        owner = self._owner
105        try:
106            owner = _active[owner].name
107        except KeyError:
108            pass
109        return "<%s %s.%s object owner=%r count=%d at %s>" % (
110            "locked" if self._block.locked() else "unlocked",
111            self.__class__.__module__,
112            self.__class__.__qualname__,
113            owner,
114            self._count,
115            hex(id(self))
116        )
117
118    def acquire(self, blocking=True, timeout=-1):
119        """Acquire a lock, blocking or non-blocking.
120
121        When invoked without arguments: if this thread already owns the lock,
122        increment the recursion level by one, and return immediately. Otherwise,
123        if another thread owns the lock, block until the lock is unlocked. Once
124        the lock is unlocked (not owned by any thread), then grab ownership, set
125        the recursion level to one, and return. If more than one thread is
126        blocked waiting until the lock is unlocked, only one at a time will be
127        able to grab ownership of the lock. There is no return value in this
128        case.
129
130        When invoked with the blocking argument set to true, do the same thing
131        as when called without arguments, and return true.
132
133        When invoked with the blocking argument set to false, do not block. If a
134        call without an argument would block, return false immediately;
135        otherwise, do the same thing as when called without arguments, and
136        return true.
137
138        When invoked with the floating-point timeout argument set to a positive
139        value, block for at most the number of seconds specified by timeout
140        and as long as the lock cannot be acquired.  Return true if the lock has
141        been acquired, false if the timeout has elapsed.
142
143        """
144        me = get_ident()
145        if self._owner == me:
146            self._count += 1
147            return 1
148        rc = self._block.acquire(blocking, timeout)
149        if rc:
150            self._owner = me
151            self._count = 1
152        return rc
153
154    __enter__ = acquire
155
156    def release(self):
157        """Release a lock, decrementing the recursion level.
158
159        If after the decrement it is zero, reset the lock to unlocked (not owned
160        by any thread), and if any other threads are blocked waiting for the
161        lock to become unlocked, allow exactly one of them to proceed. If after
162        the decrement the recursion level is still nonzero, the lock remains
163        locked and owned by the calling thread.
164
165        Only call this method when the calling thread owns the lock. A
166        RuntimeError is raised if this method is called when the lock is
167        unlocked.
168
169        There is no return value.
170
171        """
172        if self._owner != get_ident():
173            raise RuntimeError("cannot release un-acquired lock")
174        self._count = count = self._count - 1
175        if not count:
176            self._owner = None
177            self._block.release()
178
179    def __exit__(self, t, v, tb):
180        self.release()
181
182    # Internal methods used by condition variables
183
184    def _acquire_restore(self, state):
185        self._block.acquire()
186        self._count, self._owner = state
187
188    def _release_save(self):
189        if self._count == 0:
190            raise RuntimeError("cannot release un-acquired lock")
191        count = self._count
192        self._count = 0
193        owner = self._owner
194        self._owner = None
195        self._block.release()
196        return (count, owner)
197
198    def _is_owned(self):
199        return self._owner == get_ident()
200
201_PyRLock = _RLock
202
203
204class Condition:
205    """Class that implements a condition variable.
206
207    A condition variable allows one or more threads to wait until they are
208    notified by another thread.
209
210    If the lock argument is given and not None, it must be a Lock or RLock
211    object, and it is used as the underlying lock. Otherwise, a new RLock object
212    is created and used as the underlying lock.
213
214    """
215
216    def __init__(self, lock=None):
217        if lock is None:
218            lock = RLock()
219        self._lock = lock
220        # Export the lock's acquire() and release() methods
221        self.acquire = lock.acquire
222        self.release = lock.release
223        # If the lock defines _release_save() and/or _acquire_restore(),
224        # these override the default implementations (which just call
225        # release() and acquire() on the lock).  Ditto for _is_owned().
226        try:
227            self._release_save = lock._release_save
228        except AttributeError:
229            pass
230        try:
231            self._acquire_restore = lock._acquire_restore
232        except AttributeError:
233            pass
234        try:
235            self._is_owned = lock._is_owned
236        except AttributeError:
237            pass
238        self._waiters = _deque()
239
240    def __enter__(self):
241        return self._lock.__enter__()
242
243    def __exit__(self, *args):
244        return self._lock.__exit__(*args)
245
246    def __repr__(self):
247        return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
248
249    def _release_save(self):
250        self._lock.release()           # No state to save
251
252    def _acquire_restore(self, x):
253        self._lock.acquire()           # Ignore saved state
254
255    def _is_owned(self):
256        # Return True if lock is owned by current_thread.
257        # This method is called only if _lock doesn't have _is_owned().
258        if self._lock.acquire(0):
259            self._lock.release()
260            return False
261        else:
262            return True
263
264    def wait(self, timeout=None):
265        """Wait until notified or until a timeout occurs.
266
267        If the calling thread has not acquired the lock when this method is
268        called, a RuntimeError is raised.
269
270        This method releases the underlying lock, and then blocks until it is
271        awakened by a notify() or notify_all() call for the same condition
272        variable in another thread, or until the optional timeout occurs. Once
273        awakened or timed out, it re-acquires the lock and returns.
274
275        When the timeout argument is present and not None, it should be a
276        floating point number specifying a timeout for the operation in seconds
277        (or fractions thereof).
278
279        When the underlying lock is an RLock, it is not released using its
280        release() method, since this may not actually unlock the lock when it
281        was acquired multiple times recursively. Instead, an internal interface
282        of the RLock class is used, which really unlocks it even when it has
283        been recursively acquired several times. Another internal interface is
284        then used to restore the recursion level when the lock is reacquired.
285
286        """
287        if not self._is_owned():
288            raise RuntimeError("cannot wait on un-acquired lock")
289        waiter = _allocate_lock()
290        waiter.acquire()
291        self._waiters.append(waiter)
292        saved_state = self._release_save()
293        gotit = False
294        try:    # restore state no matter what (e.g., KeyboardInterrupt)
295            if timeout is None:
296                waiter.acquire()
297                gotit = True
298            else:
299                if timeout > 0:
300                    gotit = waiter.acquire(True, timeout)
301                else:
302                    gotit = waiter.acquire(False)
303            return gotit
304        finally:
305            self._acquire_restore(saved_state)
306            if not gotit:
307                try:
308                    self._waiters.remove(waiter)
309                except ValueError:
310                    pass
311
312    def wait_for(self, predicate, timeout=None):
313        """Wait until a condition evaluates to True.
314
315        predicate should be a callable which result will be interpreted as a
316        boolean value.  A timeout may be provided giving the maximum time to
317        wait.
318
319        """
320        endtime = None
321        waittime = timeout
322        result = predicate()
323        while not result:
324            if waittime is not None:
325                if endtime is None:
326                    endtime = _time() + waittime
327                else:
328                    waittime = endtime - _time()
329                    if waittime <= 0:
330                        break
331            self.wait(waittime)
332            result = predicate()
333        return result
334
335    def notify(self, n=1):
336        """Wake up one or more threads waiting on this condition, if any.
337
338        If the calling thread has not acquired the lock when this method is
339        called, a RuntimeError is raised.
340
341        This method wakes up at most n of the threads waiting for the condition
342        variable; it is a no-op if no threads are waiting.
343
344        """
345        if not self._is_owned():
346            raise RuntimeError("cannot notify on un-acquired lock")
347        all_waiters = self._waiters
348        waiters_to_notify = _deque(_islice(all_waiters, n))
349        if not waiters_to_notify:
350            return
351        for waiter in waiters_to_notify:
352            waiter.release()
353            try:
354                all_waiters.remove(waiter)
355            except ValueError:
356                pass
357
358    def notify_all(self):
359        """Wake up all threads waiting on this condition.
360
361        If the calling thread has not acquired the lock when this method
362        is called, a RuntimeError is raised.
363
364        """
365        self.notify(len(self._waiters))
366
367    notifyAll = notify_all
368
369
370class Semaphore:
371    """This class implements semaphore objects.
372
373    Semaphores manage a counter representing the number of release() calls minus
374    the number of acquire() calls, plus an initial value. The acquire() method
375    blocks if necessary until it can return without making the counter
376    negative. If not given, value defaults to 1.
377
378    """
379
380    # After Tim Peters' semaphore class, but not quite the same (no maximum)
381
382    def __init__(self, value=1):
383        if value < 0:
384            raise ValueError("semaphore initial value must be >= 0")
385        self._cond = Condition(Lock())
386        self._value = value
387
388    def acquire(self, blocking=True, timeout=None):
389        """Acquire a semaphore, decrementing the internal counter by one.
390
391        When invoked without arguments: if the internal counter is larger than
392        zero on entry, decrement it by one and return immediately. If it is zero
393        on entry, block, waiting until some other thread has called release() to
394        make it larger than zero. This is done with proper interlocking so that
395        if multiple acquire() calls are blocked, release() will wake exactly one
396        of them up. The implementation may pick one at random, so the order in
397        which blocked threads are awakened should not be relied on. There is no
398        return value in this case.
399
400        When invoked with blocking set to true, do the same thing as when called
401        without arguments, and return true.
402
403        When invoked with blocking set to false, do not block. If a call without
404        an argument would block, return false immediately; otherwise, do the
405        same thing as when called without arguments, and return true.
406
407        When invoked with a timeout other than None, it will block for at
408        most timeout seconds.  If acquire does not complete successfully in
409        that interval, return false.  Return true otherwise.
410
411        """
412        if not blocking and timeout is not None:
413            raise ValueError("can't specify timeout for non-blocking acquire")
414        rc = False
415        endtime = None
416        with self._cond:
417            while self._value == 0:
418                if not blocking:
419                    break
420                if timeout is not None:
421                    if endtime is None:
422                        endtime = _time() + timeout
423                    else:
424                        timeout = endtime - _time()
425                        if timeout <= 0:
426                            break
427                self._cond.wait(timeout)
428            else:
429                self._value -= 1
430                rc = True
431        return rc
432
433    __enter__ = acquire
434
435    def release(self):
436        """Release a semaphore, incrementing the internal counter by one.
437
438        When the counter is zero on entry and another thread is waiting for it
439        to become larger than zero again, wake up that thread.
440
441        """
442        with self._cond:
443            self._value += 1
444            self._cond.notify()
445
446    def __exit__(self, t, v, tb):
447        self.release()
448
449
450class BoundedSemaphore(Semaphore):
451    """Implements a bounded semaphore.
452
453    A bounded semaphore checks to make sure its current value doesn't exceed its
454    initial value. If it does, ValueError is raised. In most situations
455    semaphores are used to guard resources with limited capacity.
456
457    If the semaphore is released too many times it's a sign of a bug. If not
458    given, value defaults to 1.
459
460    Like regular semaphores, bounded semaphores manage a counter representing
461    the number of release() calls minus the number of acquire() calls, plus an
462    initial value. The acquire() method blocks if necessary until it can return
463    without making the counter negative. If not given, value defaults to 1.
464
465    """
466
467    def __init__(self, value=1):
468        Semaphore.__init__(self, value)
469        self._initial_value = value
470
471    def release(self):
472        """Release a semaphore, incrementing the internal counter by one.
473
474        When the counter is zero on entry and another thread is waiting for it
475        to become larger than zero again, wake up that thread.
476
477        If the number of releases exceeds the number of acquires,
478        raise a ValueError.
479
480        """
481        with self._cond:
482            if self._value >= self._initial_value:
483                raise ValueError("Semaphore released too many times")
484            self._value += 1
485            self._cond.notify()
486
487
488class Event:
489    """Class implementing event objects.
490
491    Events manage a flag that can be set to true with the set() method and reset
492    to false with the clear() method. The wait() method blocks until the flag is
493    true.  The flag is initially false.
494
495    """
496
497    # After Tim Peters' event class (without is_posted())
498
499    def __init__(self):
500        self._cond = Condition(Lock())
501        self._flag = False
502
503    def _reset_internal_locks(self):
504        # private!  called by Thread._reset_internal_locks by _after_fork()
505        self._cond.__init__(Lock())
506
507    def is_set(self):
508        """Return true if and only if the internal flag is true."""
509        return self._flag
510
511    isSet = is_set
512
513    def set(self):
514        """Set the internal flag to true.
515
516        All threads waiting for it to become true are awakened. Threads
517        that call wait() once the flag is true will not block at all.
518
519        """
520        with self._cond:
521            self._flag = True
522            self._cond.notify_all()
523
524    def clear(self):
525        """Reset the internal flag to false.
526
527        Subsequently, threads calling wait() will block until set() is called to
528        set the internal flag to true again.
529
530        """
531        with self._cond:
532            self._flag = False
533
534    def wait(self, timeout=None):
535        """Block until the internal flag is true.
536
537        If the internal flag is true on entry, return immediately. Otherwise,
538        block until another thread calls set() to set the flag to true, or until
539        the optional timeout occurs.
540
541        When the timeout argument is present and not None, it should be a
542        floating point number specifying a timeout for the operation in seconds
543        (or fractions thereof).
544
545        This method returns the internal flag on exit, so it will always return
546        True except if a timeout is given and the operation times out.
547
548        """
549        with self._cond:
550            signaled = self._flag
551            if not signaled:
552                signaled = self._cond.wait(timeout)
553            return signaled
554
555
556# A barrier class.  Inspired in part by the pthread_barrier_* api and
557# the CyclicBarrier class from Java.  See
558# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
559# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
560#        CyclicBarrier.html
561# for information.
562# We maintain two main states, 'filling' and 'draining' enabling the barrier
563# to be cyclic.  Threads are not allowed into it until it has fully drained
564# since the previous cycle.  In addition, a 'resetting' state exists which is
565# similar to 'draining' except that threads leave with a BrokenBarrierError,
566# and a 'broken' state in which all threads get the exception.
567class Barrier:
568    """Implements a Barrier.
569
570    Useful for synchronizing a fixed number of threads at known synchronization
571    points.  Threads block on 'wait()' and are simultaneously awoken once they
572    have all made that call.
573
574    """
575
576    def __init__(self, parties, action=None, timeout=None):
577        """Create a barrier, initialised to 'parties' threads.
578
579        'action' is a callable which, when supplied, will be called by one of
580        the threads after they have all entered the barrier and just prior to
581        releasing them all. If a 'timeout' is provided, it is used as the
582        default for all subsequent 'wait()' calls.
583
584        """
585        self._cond = Condition(Lock())
586        self._action = action
587        self._timeout = timeout
588        self._parties = parties
589        self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
590        self._count = 0
591
592    def wait(self, timeout=None):
593        """Wait for the barrier.
594
595        When the specified number of threads have started waiting, they are all
596        simultaneously awoken. If an 'action' was provided for the barrier, one
597        of the threads will have executed that callback prior to returning.
598        Returns an individual index number from 0 to 'parties-1'.
599
600        """
601        if timeout is None:
602            timeout = self._timeout
603        with self._cond:
604            self._enter() # Block while the barrier drains.
605            index = self._count
606            self._count += 1
607            try:
608                if index + 1 == self._parties:
609                    # We release the barrier
610                    self._release()
611                else:
612                    # We wait until someone releases us
613                    self._wait(timeout)
614                return index
615            finally:
616                self._count -= 1
617                # Wake up any threads waiting for barrier to drain.
618                self._exit()
619
620    # Block until the barrier is ready for us, or raise an exception
621    # if it is broken.
622    def _enter(self):
623        while self._state in (-1, 1):
624            # It is draining or resetting, wait until done
625            self._cond.wait()
626        #see if the barrier is in a broken state
627        if self._state < 0:
628            raise BrokenBarrierError
629        assert self._state == 0
630
631    # Optionally run the 'action' and release the threads waiting
632    # in the barrier.
633    def _release(self):
634        try:
635            if self._action:
636                self._action()
637            # enter draining state
638            self._state = 1
639            self._cond.notify_all()
640        except:
641            #an exception during the _action handler.  Break and reraise
642            self._break()
643            raise
644
645    # Wait in the barrier until we are released.  Raise an exception
646    # if the barrier is reset or broken.
647    def _wait(self, timeout):
648        if not self._cond.wait_for(lambda : self._state != 0, timeout):
649            #timed out.  Break the barrier
650            self._break()
651            raise BrokenBarrierError
652        if self._state < 0:
653            raise BrokenBarrierError
654        assert self._state == 1
655
656    # If we are the last thread to exit the barrier, signal any threads
657    # waiting for the barrier to drain.
658    def _exit(self):
659        if self._count == 0:
660            if self._state in (-1, 1):
661                #resetting or draining
662                self._state = 0
663                self._cond.notify_all()
664
665    def reset(self):
666        """Reset the barrier to the initial state.
667
668        Any threads currently waiting will get the BrokenBarrier exception
669        raised.
670
671        """
672        with self._cond:
673            if self._count > 0:
674                if self._state == 0:
675                    #reset the barrier, waking up threads
676                    self._state = -1
677                elif self._state == -2:
678                    #was broken, set it to reset state
679                    #which clears when the last thread exits
680                    self._state = -1
681            else:
682                self._state = 0
683            self._cond.notify_all()
684
685    def abort(self):
686        """Place the barrier into a 'broken' state.
687
688        Useful in case of error.  Any currently waiting threads and threads
689        attempting to 'wait()' will have BrokenBarrierError raised.
690
691        """
692        with self._cond:
693            self._break()
694
695    def _break(self):
696        # An internal error was detected.  The barrier is set to
697        # a broken state all parties awakened.
698        self._state = -2
699        self._cond.notify_all()
700
701    @property
702    def parties(self):
703        """Return the number of threads required to trip the barrier."""
704        return self._parties
705
706    @property
707    def n_waiting(self):
708        """Return the number of threads currently waiting at the barrier."""
709        # We don't need synchronization here since this is an ephemeral result
710        # anyway.  It returns the correct value in the steady state.
711        if self._state == 0:
712            return self._count
713        return 0
714
715    @property
716    def broken(self):
717        """Return True if the barrier is in a broken state."""
718        return self._state == -2
719
720# exception raised by the Barrier class
721class BrokenBarrierError(RuntimeError):
722    pass
723
724
725# Helper to generate new thread names
726_counter = _count().__next__
727_counter() # Consume 0 so first non-main thread has id 1.
728def _newname(template="Thread-%d"):
729    return template % _counter()
730
731# Active thread administration
732_active_limbo_lock = _allocate_lock()
733_active = {}    # maps thread id to Thread object
734_limbo = {}
735_dangling = WeakSet()
736
737# Main class for threads
738
739class Thread:
740    """A class that represents a thread of control.
741
742    This class can be safely subclassed in a limited fashion. There are two ways
743    to specify the activity: by passing a callable object to the constructor, or
744    by overriding the run() method in a subclass.
745
746    """
747
748    _initialized = False
749    # Need to store a reference to sys.exc_info for printing
750    # out exceptions when a thread tries to use a global var. during interp.
751    # shutdown and thus raises an exception about trying to perform some
752    # operation on/with a NoneType
753    _exc_info = _sys.exc_info
754    # Keep sys.exc_clear too to clear the exception just before
755    # allowing .join() to return.
756    #XXX __exc_clear = _sys.exc_clear
757
758    def __init__(self, group=None, target=None, name=None,
759                 args=(), kwargs=None, *, daemon=None):
760        """This constructor should always be called with keyword arguments. Arguments are:
761
762        *group* should be None; reserved for future extension when a ThreadGroup
763        class is implemented.
764
765        *target* is the callable object to be invoked by the run()
766        method. Defaults to None, meaning nothing is called.
767
768        *name* is the thread name. By default, a unique name is constructed of
769        the form "Thread-N" where N is a small decimal number.
770
771        *args* is the argument tuple for the target invocation. Defaults to ().
772
773        *kwargs* is a dictionary of keyword arguments for the target
774        invocation. Defaults to {}.
775
776        If a subclass overrides the constructor, it must make sure to invoke
777        the base class constructor (Thread.__init__()) before doing anything
778        else to the thread.
779
780        """
781        assert group is None, "group argument must be None for now"
782        if kwargs is None:
783            kwargs = {}
784        self._target = target
785        self._name = str(name or _newname())
786        self._args = args
787        self._kwargs = kwargs
788        if daemon is not None:
789            self._daemonic = daemon
790        else:
791            self._daemonic = current_thread().daemon
792        self._ident = None
793        self._tstate_lock = None
794        self._started = Event()
795        self._is_stopped = False
796        self._initialized = True
797        # sys.stderr is not stored in the class like
798        # sys.exc_info since it can be changed between instances
799        self._stderr = _sys.stderr
800        # For debugging and _after_fork()
801        _dangling.add(self)
802
803    def _reset_internal_locks(self, is_alive):
804        # private!  Called by _after_fork() to reset our internal locks as
805        # they may be in an invalid state leading to a deadlock or crash.
806        self._started._reset_internal_locks()
807        if is_alive:
808            self._set_tstate_lock()
809        else:
810            # The thread isn't alive after fork: it doesn't have a tstate
811            # anymore.
812            self._is_stopped = True
813            self._tstate_lock = None
814
815    def __repr__(self):
816        assert self._initialized, "Thread.__init__() was not called"
817        status = "initial"
818        if self._started.is_set():
819            status = "started"
820        self.is_alive() # easy way to get ._is_stopped set when appropriate
821        if self._is_stopped:
822            status = "stopped"
823        if self._daemonic:
824            status += " daemon"
825        if self._ident is not None:
826            status += " %s" % self._ident
827        return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
828
829    def start(self):
830        """Start the thread's activity.
831
832        It must be called at most once per thread object. It arranges for the
833        object's run() method to be invoked in a separate thread of control.
834
835        This method will raise a RuntimeError if called more than once on the
836        same thread object.
837
838        """
839        if not self._initialized:
840            raise RuntimeError("thread.__init__() not called")
841
842        if self._started.is_set():
843            raise RuntimeError("threads can only be started once")
844        with _active_limbo_lock:
845            _limbo[self] = self
846        try:
847            _start_new_thread(self._bootstrap, ())
848        except Exception:
849            with _active_limbo_lock:
850                del _limbo[self]
851            raise
852        self._started.wait()
853
854    def run(self):
855        """Method representing the thread's activity.
856
857        You may override this method in a subclass. The standard run() method
858        invokes the callable object passed to the object's constructor as the
859        target argument, if any, with sequential and keyword arguments taken
860        from the args and kwargs arguments, respectively.
861
862        """
863        try:
864            if self._target:
865                self._target(*self._args, **self._kwargs)
866        finally:
867            # Avoid a refcycle if the thread is running a function with
868            # an argument that has a member that points to the thread.
869            del self._target, self._args, self._kwargs
870
871    def _bootstrap(self):
872        # Wrapper around the real bootstrap code that ignores
873        # exceptions during interpreter cleanup.  Those typically
874        # happen when a daemon thread wakes up at an unfortunate
875        # moment, finds the world around it destroyed, and raises some
876        # random exception *** while trying to report the exception in
877        # _bootstrap_inner() below ***.  Those random exceptions
878        # don't help anybody, and they confuse users, so we suppress
879        # them.  We suppress them only when it appears that the world
880        # indeed has already been destroyed, so that exceptions in
881        # _bootstrap_inner() during normal business hours are properly
882        # reported.  Also, we only suppress them for daemonic threads;
883        # if a non-daemonic encounters this, something else is wrong.
884        try:
885            self._bootstrap_inner()
886        except:
887            if self._daemonic and _sys is None:
888                return
889            raise
890
891    def _set_ident(self):
892        self._ident = get_ident()
893
894    def _set_tstate_lock(self):
895        """
896        Set a lock object which will be released by the interpreter when
897        the underlying thread state (see pystate.h) gets deleted.
898        """
899        self._tstate_lock = _set_sentinel()
900        self._tstate_lock.acquire()
901
902    def _bootstrap_inner(self):
903        try:
904            self._set_ident()
905            self._set_tstate_lock()
906            self._started.set()
907            with _active_limbo_lock:
908                _active[self._ident] = self
909                del _limbo[self]
910
911            if _trace_hook:
912                _sys.settrace(_trace_hook)
913            if _profile_hook:
914                _sys.setprofile(_profile_hook)
915
916            try:
917                self.run()
918            except SystemExit:
919                pass
920            except:
921                # If sys.stderr is no more (most likely from interpreter
922                # shutdown) use self._stderr.  Otherwise still use sys (as in
923                # _sys) in case sys.stderr was redefined since the creation of
924                # self.
925                if _sys and _sys.stderr is not None:
926                    print("Exception in thread %s:\n%s" %
927                          (self.name, _format_exc()), file=_sys.stderr)
928                elif self._stderr is not None:
929                    # Do the best job possible w/o a huge amt. of code to
930                    # approximate a traceback (code ideas from
931                    # Lib/traceback.py)
932                    exc_type, exc_value, exc_tb = self._exc_info()
933                    try:
934                        print((
935                            "Exception in thread " + self.name +
936                            " (most likely raised during interpreter shutdown):"), file=self._stderr)
937                        print((
938                            "Traceback (most recent call last):"), file=self._stderr)
939                        while exc_tb:
940                            print((
941                                '  File "%s", line %s, in %s' %
942                                (exc_tb.tb_frame.f_code.co_filename,
943                                    exc_tb.tb_lineno,
944                                    exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
945                            exc_tb = exc_tb.tb_next
946                        print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
947                        self._stderr.flush()
948                    # Make sure that exc_tb gets deleted since it is a memory
949                    # hog; deleting everything else is just for thoroughness
950                    finally:
951                        del exc_type, exc_value, exc_tb
952            finally:
953                # Prevent a race in
954                # test_threading.test_no_refcycle_through_target when
955                # the exception keeps the target alive past when we
956                # assert that it's dead.
957                #XXX self._exc_clear()
958                pass
959        finally:
960            with _active_limbo_lock:
961                try:
962                    # We don't call self._delete() because it also
963                    # grabs _active_limbo_lock.
964                    del _active[get_ident()]
965                except:
966                    pass
967
968    def _stop(self):
969        # After calling ._stop(), .is_alive() returns False and .join() returns
970        # immediately.  ._tstate_lock must be released before calling ._stop().
971        #
972        # Normal case:  C code at the end of the thread's life
973        # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
974        # that's detected by our ._wait_for_tstate_lock(), called by .join()
975        # and .is_alive().  Any number of threads _may_ call ._stop()
976        # simultaneously (for example, if multiple threads are blocked in
977        # .join() calls), and they're not serialized.  That's harmless -
978        # they'll just make redundant rebindings of ._is_stopped and
979        # ._tstate_lock.  Obscure:  we rebind ._tstate_lock last so that the
980        # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
981        # (the assert is executed only if ._tstate_lock is None).
982        #
983        # Special case:  _main_thread releases ._tstate_lock via this
984        # module's _shutdown() function.
985        lock = self._tstate_lock
986        if lock is not None:
987            assert not lock.locked()
988        self._is_stopped = True
989        self._tstate_lock = None
990
991    def _delete(self):
992        "Remove current thread from the dict of currently running threads."
993        with _active_limbo_lock:
994            del _active[get_ident()]
995            # There must not be any python code between the previous line
996            # and after the lock is released.  Otherwise a tracing function
997            # could try to acquire the lock again in the same thread, (in
998            # current_thread()), and would block.
999
1000    def join(self, timeout=None):
1001        """Wait until the thread terminates.
1002
1003        This blocks the calling thread until the thread whose join() method is
1004        called terminates -- either normally or through an unhandled exception
1005        or until the optional timeout occurs.
1006
1007        When the timeout argument is present and not None, it should be a
1008        floating point number specifying a timeout for the operation in seconds
1009        (or fractions thereof). As join() always returns None, you must call
1010        is_alive() after join() to decide whether a timeout happened -- if the
1011        thread is still alive, the join() call timed out.
1012
1013        When the timeout argument is not present or None, the operation will
1014        block until the thread terminates.
1015
1016        A thread can be join()ed many times.
1017
1018        join() raises a RuntimeError if an attempt is made to join the current
1019        thread as that would cause a deadlock. It is also an error to join() a
1020        thread before it has been started and attempts to do so raises the same
1021        exception.
1022
1023        """
1024        if not self._initialized:
1025            raise RuntimeError("Thread.__init__() not called")
1026        if not self._started.is_set():
1027            raise RuntimeError("cannot join thread before it is started")
1028        if self is current_thread():
1029            raise RuntimeError("cannot join current thread")
1030
1031        if timeout is None:
1032            self._wait_for_tstate_lock()
1033        else:
1034            # the behavior of a negative timeout isn't documented, but
1035            # historically .join(timeout=x) for x<0 has acted as if timeout=0
1036            self._wait_for_tstate_lock(timeout=max(timeout, 0))
1037
1038    def _wait_for_tstate_lock(self, block=True, timeout=-1):
1039        # Issue #18808: wait for the thread state to be gone.
1040        # At the end of the thread's life, after all knowledge of the thread
1041        # is removed from C data structures, C code releases our _tstate_lock.
1042        # This method passes its arguments to _tstate_lock.acquire().
1043        # If the lock is acquired, the C code is done, and self._stop() is
1044        # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
1045        lock = self._tstate_lock
1046        if lock is None:  # already determined that the C code is done
1047            assert self._is_stopped
1048        elif lock.acquire(block, timeout):
1049            lock.release()
1050            self._stop()
1051
1052    @property
1053    def name(self):
1054        """A string used for identification purposes only.
1055
1056        It has no semantics. Multiple threads may be given the same name. The
1057        initial name is set by the constructor.
1058
1059        """
1060        assert self._initialized, "Thread.__init__() not called"
1061        return self._name
1062
1063    @name.setter
1064    def name(self, name):
1065        assert self._initialized, "Thread.__init__() not called"
1066        self._name = str(name)
1067
1068    @property
1069    def ident(self):
1070        """Thread identifier of this thread or None if it has not been started.
1071
1072        This is a nonzero integer. See the get_ident() function. Thread
1073        identifiers may be recycled when a thread exits and another thread is
1074        created. The identifier is available even after the thread has exited.
1075
1076        """
1077        assert self._initialized, "Thread.__init__() not called"
1078        return self._ident
1079
1080    def is_alive(self):
1081        """Return whether the thread is alive.
1082
1083        This method returns True just before the run() method starts until just
1084        after the run() method terminates. The module function enumerate()
1085        returns a list of all alive threads.
1086
1087        """
1088        assert self._initialized, "Thread.__init__() not called"
1089        if self._is_stopped or not self._started.is_set():
1090            return False
1091        self._wait_for_tstate_lock(False)
1092        return not self._is_stopped
1093
1094    def isAlive(self):
1095        """Return whether the thread is alive.
1096
1097        This method is deprecated, use is_alive() instead.
1098        """
1099        import warnings
1100        warnings.warn('isAlive() is deprecated, use is_alive() instead',
1101                      PendingDeprecationWarning, stacklevel=2)
1102        return self.is_alive()
1103
1104    @property
1105    def daemon(self):
1106        """A boolean value indicating whether this thread is a daemon thread.
1107
1108        This must be set before start() is called, otherwise RuntimeError is
1109        raised. Its initial value is inherited from the creating thread; the
1110        main thread is not a daemon thread and therefore all threads created in
1111        the main thread default to daemon = False.
1112
1113        The entire Python program exits when no alive non-daemon threads are
1114        left.
1115
1116        """
1117        assert self._initialized, "Thread.__init__() not called"
1118        return self._daemonic
1119
1120    @daemon.setter
1121    def daemon(self, daemonic):
1122        if not self._initialized:
1123            raise RuntimeError("Thread.__init__() not called")
1124        if self._started.is_set():
1125            raise RuntimeError("cannot set daemon status of active thread")
1126        self._daemonic = daemonic
1127
1128    def isDaemon(self):
1129        return self.daemon
1130
1131    def setDaemon(self, daemonic):
1132        self.daemon = daemonic
1133
1134    def getName(self):
1135        return self.name
1136
1137    def setName(self, name):
1138        self.name = name
1139
1140# The timer class was contributed by Itamar Shtull-Trauring
1141
1142class Timer(Thread):
1143    """Call a function after a specified number of seconds:
1144
1145            t = Timer(30.0, f, args=None, kwargs=None)
1146            t.start()
1147            t.cancel()     # stop the timer's action if it's still waiting
1148
1149    """
1150
1151    def __init__(self, interval, function, args=None, kwargs=None):
1152        Thread.__init__(self)
1153        self.interval = interval
1154        self.function = function
1155        self.args = args if args is not None else []
1156        self.kwargs = kwargs if kwargs is not None else {}
1157        self.finished = Event()
1158
1159    def cancel(self):
1160        """Stop the timer if it hasn't finished yet."""
1161        self.finished.set()
1162
1163    def run(self):
1164        self.finished.wait(self.interval)
1165        if not self.finished.is_set():
1166            self.function(*self.args, **self.kwargs)
1167        self.finished.set()
1168
1169
1170# Special thread class to represent the main thread
1171
1172class _MainThread(Thread):
1173
1174    def __init__(self):
1175        Thread.__init__(self, name="MainThread", daemon=False)
1176        self._set_tstate_lock()
1177        self._started.set()
1178        self._set_ident()
1179        with _active_limbo_lock:
1180            _active[self._ident] = self
1181
1182
1183# Dummy thread class to represent threads not started here.
1184# These aren't garbage collected when they die, nor can they be waited for.
1185# If they invoke anything in threading.py that calls current_thread(), they
1186# leave an entry in the _active dict forever after.
1187# Their purpose is to return *something* from current_thread().
1188# They are marked as daemon threads so we won't wait for them
1189# when we exit (conform previous semantics).
1190
1191class _DummyThread(Thread):
1192
1193    def __init__(self):
1194        Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
1195
1196        self._started.set()
1197        self._set_ident()
1198        with _active_limbo_lock:
1199            _active[self._ident] = self
1200
1201    def _stop(self):
1202        pass
1203
1204    def is_alive(self):
1205        assert not self._is_stopped and self._started.is_set()
1206        return True
1207
1208    def join(self, timeout=None):
1209        assert False, "cannot join a dummy thread"
1210
1211
1212# Global API functions
1213
1214def current_thread():
1215    """Return the current Thread object, corresponding to the caller's thread of control.
1216
1217    If the caller's thread of control was not created through the threading
1218    module, a dummy thread object with limited functionality is returned.
1219
1220    """
1221    try:
1222        return _active[get_ident()]
1223    except KeyError:
1224        return _DummyThread()
1225
1226currentThread = current_thread
1227
1228def active_count():
1229    """Return the number of Thread objects currently alive.
1230
1231    The returned count is equal to the length of the list returned by
1232    enumerate().
1233
1234    """
1235    with _active_limbo_lock:
1236        return len(_active) + len(_limbo)
1237
1238activeCount = active_count
1239
1240def _enumerate():
1241    # Same as enumerate(), but without the lock. Internal use only.
1242    return list(_active.values()) + list(_limbo.values())
1243
1244def enumerate():
1245    """Return a list of all Thread objects currently alive.
1246
1247    The list includes daemonic threads, dummy thread objects created by
1248    current_thread(), and the main thread. It excludes terminated threads and
1249    threads that have not yet been started.
1250
1251    """
1252    with _active_limbo_lock:
1253        return list(_active.values()) + list(_limbo.values())
1254
1255from _thread import stack_size
1256
1257# Create the main thread object,
1258# and make it available for the interpreter
1259# (Py_Main) as threading._shutdown.
1260
1261_main_thread = _MainThread()
1262
1263def _shutdown():
1264    # Obscure:  other threads may be waiting to join _main_thread.  That's
1265    # dubious, but some code does it.  We can't wait for C code to release
1266    # the main thread's tstate_lock - that won't happen until the interpreter
1267    # is nearly dead.  So we release it here.  Note that just calling _stop()
1268    # isn't enough:  other threads may already be waiting on _tstate_lock.
1269    if _main_thread._is_stopped:
1270        # _shutdown() was already called
1271        return
1272    tlock = _main_thread._tstate_lock
1273    # The main thread isn't finished yet, so its thread state lock can't have
1274    # been released.
1275    assert tlock is not None
1276    assert tlock.locked()
1277    tlock.release()
1278    _main_thread._stop()
1279    t = _pickSomeNonDaemonThread()
1280    while t:
1281        t.join()
1282        t = _pickSomeNonDaemonThread()
1283
1284def _pickSomeNonDaemonThread():
1285    for t in enumerate():
1286        if not t.daemon and t.is_alive():
1287            return t
1288    return None
1289
1290def main_thread():
1291    """Return the main thread object.
1292
1293    In normal conditions, the main thread is the thread from which the
1294    Python interpreter was started.
1295    """
1296    return _main_thread
1297
1298# get thread-local implementation, either from the thread
1299# module, or from the python fallback
1300
1301try:
1302    from _thread import _local as local
1303except ImportError:
1304    from _threading_local import local
1305
1306
1307def _after_fork():
1308    """
1309    Cleanup threading module state that should not exist after a fork.
1310    """
1311    # Reset _active_limbo_lock, in case we forked while the lock was held
1312    # by another (non-forked) thread.  http://bugs.python.org/issue874900
1313    global _active_limbo_lock, _main_thread
1314    _active_limbo_lock = _allocate_lock()
1315
1316    # fork() only copied the current thread; clear references to others.
1317    new_active = {}
1318    current = current_thread()
1319    _main_thread = current
1320    with _active_limbo_lock:
1321        # Dangling thread instances must still have their locks reset,
1322        # because someone may join() them.
1323        threads = set(_enumerate())
1324        threads.update(_dangling)
1325        for thread in threads:
1326            # Any lock/condition variable may be currently locked or in an
1327            # invalid state, so we reinitialize them.
1328            if thread is current:
1329                # There is only one active thread. We reset the ident to
1330                # its new value since it can have changed.
1331                thread._reset_internal_locks(True)
1332                ident = get_ident()
1333                thread._ident = ident
1334                new_active[ident] = thread
1335            else:
1336                # All the others are already stopped.
1337                thread._reset_internal_locks(False)
1338                thread._stop()
1339
1340        _limbo.clear()
1341        _active.clear()
1342        _active.update(new_active)
1343        assert len(_active) == 1
1344
1345
1346if hasattr(_os, "register_at_fork"):
1347    _os.register_at_fork(after_in_child=_after_fork)
1348