• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Thread module emulating a subset of Java's threading model."""
2
3import sys as _sys
4
5try:
6    import thread
7except ImportError:
8    del _sys.modules[__name__]
9    raise
10
11import warnings
12
13from collections import deque as _deque
14from itertools import count as _count
15from time import time as _time, sleep as _sleep
16from traceback import format_exc as _format_exc
17
18# Note regarding PEP 8 compliant aliases
19#  This threading model was originally inspired by Java, and inherited
20# the convention of camelCase function and method names from that
21# language. While those names are not in any imminent danger of being
22# deprecated, starting with Python 2.6, the module now provides a
23# PEP 8 compliant alias for any such method name.
24# Using the new PEP 8 compliant names also facilitates substitution
25# with the multiprocessing module, which doesn't provide the old
26# Java inspired names.
27
28
29# Rename some stuff so "from threading import *" is safe
30__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
31           'current_thread', 'enumerate', 'Event',
32           'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
33           'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
34
35_start_new_thread = thread.start_new_thread
36_allocate_lock = thread.allocate_lock
37_get_ident = thread.get_ident
38ThreadError = thread.error
39del thread
40
41
42# sys.exc_clear is used to work around the fact that except blocks
43# don't fully clear the exception until 3.0.
44warnings.filterwarnings('ignore', category=DeprecationWarning,
45                        module='threading', message='sys.exc_clear')
46
47# Debug support (adapted from ihooks.py).
48# All the major classes here derive from _Verbose.  We force that to
49# be a new-style class so that all the major classes here are new-style.
50# This helps debugging (type(instance) is more revealing for instances
51# of new-style classes).
52
53_VERBOSE = False
54
55if __debug__:
56
57    class _Verbose(object):
58
59        def __init__(self, verbose=None):
60            if verbose is None:
61                verbose = _VERBOSE
62            self.__verbose = verbose
63
64        def _note(self, format, *args):
65            if self.__verbose:
66                format = format % args
67                # Issue #4188: calling current_thread() can incur an infinite
68                # recursion if it has to create a DummyThread on the fly.
69                ident = _get_ident()
70                try:
71                    name = _active[ident].name
72                except KeyError:
73                    name = "<OS thread %d>" % ident
74                format = "%s: %s\n" % (name, format)
75                _sys.stderr.write(format)
76
77else:
78    # Disable this when using "python -O"
79    class _Verbose(object):
80        def __init__(self, verbose=None):
81            pass
82        def _note(self, *args):
83            pass
84
85# Support for profile and trace hooks
86
87_profile_hook = None
88_trace_hook = None
89
90def setprofile(func):
91    """Set a profile function for all threads started from the threading module.
92
93    The func will be passed to sys.setprofile() for each thread, before its
94    run() method is called.
95
96    """
97    global _profile_hook
98    _profile_hook = func
99
100def settrace(func):
101    """Set a trace function for all threads started from the threading module.
102
103    The func will be passed to sys.settrace() for each thread, before its run()
104    method is called.
105
106    """
107    global _trace_hook
108    _trace_hook = func
109
110# Synchronization classes
111
112Lock = _allocate_lock
113
114def RLock(*args, **kwargs):
115    """Factory function that returns a new reentrant lock.
116
117    A reentrant lock must be released by the thread that acquired it. Once a
118    thread has acquired a reentrant lock, the same thread may acquire it again
119    without blocking; the thread must release it once for each time it has
120    acquired it.
121
122    """
123    return _RLock(*args, **kwargs)
124
125class _RLock(_Verbose):
126    """A reentrant lock must be released by the thread that acquired it. Once a
127       thread has acquired a reentrant lock, the same thread may acquire it
128       again without blocking; the thread must release it once for each time it
129       has acquired it.
130    """
131
132    def __init__(self, verbose=None):
133        _Verbose.__init__(self, verbose)
134        self.__block = _allocate_lock()
135        self.__owner = None
136        self.__count = 0
137
138    def __repr__(self):
139        owner = self.__owner
140        try:
141            owner = _active[owner].name
142        except KeyError:
143            pass
144        return "<%s owner=%r count=%d>" % (
145                self.__class__.__name__, owner, self.__count)
146
147    def acquire(self, blocking=1):
148        """Acquire a lock, blocking or non-blocking.
149
150        When invoked without arguments: if this thread already owns the lock,
151        increment the recursion level by one, and return immediately. Otherwise,
152        if another thread owns the lock, block until the lock is unlocked. Once
153        the lock is unlocked (not owned by any thread), then grab ownership, set
154        the recursion level to one, and return. If more than one thread is
155        blocked waiting until the lock is unlocked, only one at a time will be
156        able to grab ownership of the lock. There is no return value in this
157        case.
158
159        When invoked with the blocking argument set to true, do the same thing
160        as when called without arguments, and return true.
161
162        When invoked with the blocking argument set to false, do not block. If a
163        call without an argument would block, return false immediately;
164        otherwise, do the same thing as when called without arguments, and
165        return true.
166
167        """
168        me = _get_ident()
169        if self.__owner == me:
170            self.__count = self.__count + 1
171            if __debug__:
172                self._note("%s.acquire(%s): recursive success", self, blocking)
173            return 1
174        rc = self.__block.acquire(blocking)
175        if rc:
176            self.__owner = me
177            self.__count = 1
178            if __debug__:
179                self._note("%s.acquire(%s): initial success", self, blocking)
180        else:
181            if __debug__:
182                self._note("%s.acquire(%s): failure", self, blocking)
183        return rc
184
185    __enter__ = acquire
186
187    def release(self):
188        """Release a lock, decrementing the recursion level.
189
190        If after the decrement it is zero, reset the lock to unlocked (not owned
191        by any thread), and if any other threads are blocked waiting for the
192        lock to become unlocked, allow exactly one of them to proceed. If after
193        the decrement the recursion level is still nonzero, the lock remains
194        locked and owned by the calling thread.
195
196        Only call this method when the calling thread owns the lock. A
197        RuntimeError is raised if this method is called when the lock is
198        unlocked.
199
200        There is no return value.
201
202        """
203        if self.__owner != _get_ident():
204            raise RuntimeError("cannot release un-acquired lock")
205        self.__count = count = self.__count - 1
206        if not count:
207            self.__owner = None
208            self.__block.release()
209            if __debug__:
210                self._note("%s.release(): final release", self)
211        else:
212            if __debug__:
213                self._note("%s.release(): non-final release", self)
214
215    def __exit__(self, t, v, tb):
216        self.release()
217
218    # Internal methods used by condition variables
219
220    def _acquire_restore(self, count_owner):
221        count, owner = count_owner
222        self.__block.acquire()
223        self.__count = count
224        self.__owner = owner
225        if __debug__:
226            self._note("%s._acquire_restore()", self)
227
228    def _release_save(self):
229        if __debug__:
230            self._note("%s._release_save()", self)
231        count = self.__count
232        self.__count = 0
233        owner = self.__owner
234        self.__owner = None
235        self.__block.release()
236        return (count, owner)
237
238    def _is_owned(self):
239        return self.__owner == _get_ident()
240
241
242def Condition(*args, **kwargs):
243    """Factory function that returns a new condition variable object.
244
245    A condition variable allows one or more threads to wait until they are
246    notified by another thread.
247
248    If the lock argument is given and not None, it must be a Lock or RLock
249    object, and it is used as the underlying lock. Otherwise, a new RLock object
250    is created and used as the underlying lock.
251
252    """
253    return _Condition(*args, **kwargs)
254
255class _Condition(_Verbose):
256    """Condition variables allow one or more threads to wait until they are
257       notified by another thread.
258    """
259
260    def __init__(self, lock=None, verbose=None):
261        _Verbose.__init__(self, verbose)
262        if lock is None:
263            lock = RLock()
264        self.__lock = lock
265        # Export the lock's acquire() and release() methods
266        self.acquire = lock.acquire
267        self.release = lock.release
268        # If the lock defines _release_save() and/or _acquire_restore(),
269        # these override the default implementations (which just call
270        # release() and acquire() on the lock).  Ditto for _is_owned().
271        try:
272            self._release_save = lock._release_save
273        except AttributeError:
274            pass
275        try:
276            self._acquire_restore = lock._acquire_restore
277        except AttributeError:
278            pass
279        try:
280            self._is_owned = lock._is_owned
281        except AttributeError:
282            pass
283        self.__waiters = []
284
285    def __enter__(self):
286        return self.__lock.__enter__()
287
288    def __exit__(self, *args):
289        return self.__lock.__exit__(*args)
290
291    def __repr__(self):
292        return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
293
294    def _release_save(self):
295        self.__lock.release()           # No state to save
296
297    def _acquire_restore(self, x):
298        self.__lock.acquire()           # Ignore saved state
299
300    def _is_owned(self):
301        # Return True if lock is owned by current_thread.
302        # This method is called only if __lock doesn't have _is_owned().
303        if self.__lock.acquire(0):
304            self.__lock.release()
305            return False
306        else:
307            return True
308
309    def wait(self, timeout=None):
310        """Wait until notified or until a timeout occurs.
311
312        If the calling thread has not acquired the lock when this method is
313        called, a RuntimeError is raised.
314
315        This method releases the underlying lock, and then blocks until it is
316        awakened by a notify() or notifyAll() call for the same condition
317        variable in another thread, or until the optional timeout occurs. Once
318        awakened or timed out, it re-acquires the lock and returns.
319
320        When the timeout argument is present and not None, it should be a
321        floating point number specifying a timeout for the operation in seconds
322        (or fractions thereof).
323
324        When the underlying lock is an RLock, it is not released using its
325        release() method, since this may not actually unlock the lock when it
326        was acquired multiple times recursively. Instead, an internal interface
327        of the RLock class is used, which really unlocks it even when it has
328        been recursively acquired several times. Another internal interface is
329        then used to restore the recursion level when the lock is reacquired.
330
331        """
332        if not self._is_owned():
333            raise RuntimeError("cannot wait on un-acquired lock")
334        waiter = _allocate_lock()
335        waiter.acquire()
336        self.__waiters.append(waiter)
337        saved_state = self._release_save()
338        try:    # restore state no matter what (e.g., KeyboardInterrupt)
339            if timeout is None:
340                waiter.acquire()
341                if __debug__:
342                    self._note("%s.wait(): got it", self)
343            else:
344                # Balancing act:  We can't afford a pure busy loop, so we
345                # have to sleep; but if we sleep the whole timeout time,
346                # we'll be unresponsive.  The scheme here sleeps very
347                # little at first, longer as time goes on, but never longer
348                # than 20 times per second (or the timeout time remaining).
349                endtime = _time() + timeout
350                delay = 0.0005 # 500 us -> initial delay of 1 ms
351                while True:
352                    gotit = waiter.acquire(0)
353                    if gotit:
354                        break
355                    remaining = endtime - _time()
356                    if remaining <= 0:
357                        break
358                    delay = min(delay * 2, remaining, .05)
359                    _sleep(delay)
360                if not gotit:
361                    if __debug__:
362                        self._note("%s.wait(%s): timed out", self, timeout)
363                    try:
364                        self.__waiters.remove(waiter)
365                    except ValueError:
366                        pass
367                else:
368                    if __debug__:
369                        self._note("%s.wait(%s): got it", self, timeout)
370        finally:
371            self._acquire_restore(saved_state)
372
373    def notify(self, n=1):
374        """Wake up one or more threads waiting on this condition, if any.
375
376        If the calling thread has not acquired the lock when this method is
377        called, a RuntimeError is raised.
378
379        This method wakes up at most n of the threads waiting for the condition
380        variable; it is a no-op if no threads are waiting.
381
382        """
383        if not self._is_owned():
384            raise RuntimeError("cannot notify on un-acquired lock")
385        __waiters = self.__waiters
386        waiters = __waiters[:n]
387        if not waiters:
388            if __debug__:
389                self._note("%s.notify(): no waiters", self)
390            return
391        self._note("%s.notify(): notifying %d waiter%s", self, n,
392                   n!=1 and "s" or "")
393        for waiter in waiters:
394            waiter.release()
395            try:
396                __waiters.remove(waiter)
397            except ValueError:
398                pass
399
400    def notifyAll(self):
401        """Wake up all threads waiting on this condition.
402
403        If the calling thread has not acquired the lock when this method
404        is called, a RuntimeError is raised.
405
406        """
407        self.notify(len(self.__waiters))
408
409    notify_all = notifyAll
410
411
412def Semaphore(*args, **kwargs):
413    """A factory function that returns a new semaphore.
414
415    Semaphores manage a counter representing the number of release() calls minus
416    the number of acquire() calls, plus an initial value. The acquire() method
417    blocks if necessary until it can return without making the counter
418    negative. If not given, value defaults to 1.
419
420    """
421    return _Semaphore(*args, **kwargs)
422
423class _Semaphore(_Verbose):
424    """Semaphores manage a counter representing the number of release() calls
425       minus the number of acquire() calls, plus an initial value. The acquire()
426       method blocks if necessary until it can return without making the counter
427       negative. If not given, value defaults to 1.
428
429    """
430
431    # After Tim Peters' semaphore class, but not quite the same (no maximum)
432
433    def __init__(self, value=1, verbose=None):
434        if value < 0:
435            raise ValueError("semaphore initial value must be >= 0")
436        _Verbose.__init__(self, verbose)
437        self.__cond = Condition(Lock())
438        self.__value = value
439
440    def acquire(self, blocking=1):
441        """Acquire a semaphore, decrementing the internal counter by one.
442
443        When invoked without arguments: if the internal counter is larger than
444        zero on entry, decrement it by one and return immediately. If it is zero
445        on entry, block, waiting until some other thread has called release() to
446        make it larger than zero. This is done with proper interlocking so that
447        if multiple acquire() calls are blocked, release() will wake exactly one
448        of them up. The implementation may pick one at random, so the order in
449        which blocked threads are awakened should not be relied on. There is no
450        return value in this case.
451
452        When invoked with blocking set to true, do the same thing as when called
453        without arguments, and return true.
454
455        When invoked with blocking set to false, do not block. If a call without
456        an argument would block, return false immediately; otherwise, do the
457        same thing as when called without arguments, and return true.
458
459        """
460        rc = False
461        with self.__cond:
462            while self.__value == 0:
463                if not blocking:
464                    break
465                if __debug__:
466                    self._note("%s.acquire(%s): blocked waiting, value=%s",
467                            self, blocking, self.__value)
468                self.__cond.wait()
469            else:
470                self.__value = self.__value - 1
471                if __debug__:
472                    self._note("%s.acquire: success, value=%s",
473                            self, self.__value)
474                rc = True
475        return rc
476
477    __enter__ = acquire
478
479    def release(self):
480        """Release a semaphore, incrementing the internal counter by one.
481
482        When the counter is zero on entry and another thread is waiting for it
483        to become larger than zero again, wake up that thread.
484
485        """
486        with self.__cond:
487            self.__value = self.__value + 1
488            if __debug__:
489                self._note("%s.release: success, value=%s",
490                        self, self.__value)
491            self.__cond.notify()
492
493    def __exit__(self, t, v, tb):
494        self.release()
495
496
497def BoundedSemaphore(*args, **kwargs):
498    """A factory function that returns a new bounded semaphore.
499
500    A bounded semaphore checks to make sure its current value doesn't exceed its
501    initial value. If it does, ValueError is raised. In most situations
502    semaphores are used to guard resources with limited capacity.
503
504    If the semaphore is released too many times it's a sign of a bug. If not
505    given, value defaults to 1.
506
507    Like regular semaphores, bounded semaphores manage a counter representing
508    the number of release() calls minus the number of acquire() calls, plus an
509    initial value. The acquire() method blocks if necessary until it can return
510    without making the counter negative. If not given, value defaults to 1.
511
512    """
513    return _BoundedSemaphore(*args, **kwargs)
514
515class _BoundedSemaphore(_Semaphore):
516    """A bounded semaphore checks to make sure its current value doesn't exceed
517       its initial value. If it does, ValueError is raised. In most situations
518       semaphores are used to guard resources with limited capacity.
519    """
520
521    def __init__(self, value=1, verbose=None):
522        _Semaphore.__init__(self, value, verbose)
523        self._initial_value = value
524
525    def release(self):
526        """Release a semaphore, incrementing the internal counter by one.
527
528        When the counter is zero on entry and another thread is waiting for it
529        to become larger than zero again, wake up that thread.
530
531        If the number of releases exceeds the number of acquires,
532        raise a ValueError.
533
534        """
535        with self._Semaphore__cond:
536            if self._Semaphore__value >= self._initial_value:
537                raise ValueError("Semaphore released too many times")
538            self._Semaphore__value += 1
539            self._Semaphore__cond.notify()
540
541
542def Event(*args, **kwargs):
543    """A factory function that returns a new event.
544
545    Events manage a flag that can be set to true with the set() method and reset
546    to false with the clear() method. The wait() method blocks until the flag is
547    true.
548
549    """
550    return _Event(*args, **kwargs)
551
552class _Event(_Verbose):
553    """A factory function that returns a new event object. An event manages a
554       flag that can be set to true with the set() method and reset to false
555       with the clear() method. The wait() method blocks until the flag is true.
556
557    """
558
559    # After Tim Peters' event class (without is_posted())
560
561    def __init__(self, verbose=None):
562        _Verbose.__init__(self, verbose)
563        self.__cond = Condition(Lock())
564        self.__flag = False
565
566    def _reset_internal_locks(self):
567        # private!  called by Thread._reset_internal_locks by _after_fork()
568        self.__cond.__init__(Lock())
569
570    def isSet(self):
571        'Return true if and only if the internal flag is true.'
572        return self.__flag
573
574    is_set = isSet
575
576    def set(self):
577        """Set the internal flag to true.
578
579        All threads waiting for the flag to become true are awakened. Threads
580        that call wait() once the flag is true will not block at all.
581
582        """
583        with self.__cond:
584            self.__flag = True
585            self.__cond.notify_all()
586
587    def clear(self):
588        """Reset the internal flag to false.
589
590        Subsequently, threads calling wait() will block until set() is called to
591        set the internal flag to true again.
592
593        """
594        with self.__cond:
595            self.__flag = False
596
597    def wait(self, timeout=None):
598        """Block until the internal flag is true.
599
600        If the internal flag is true on entry, return immediately. Otherwise,
601        block until another thread calls set() to set the flag to true, or until
602        the optional timeout occurs.
603
604        When the timeout argument is present and not None, it should be a
605        floating point number specifying a timeout for the operation in seconds
606        (or fractions thereof).
607
608        This method returns the internal flag on exit, so it will always return
609        True except if a timeout is given and the operation times out.
610
611        """
612        with self.__cond:
613            if not self.__flag:
614                self.__cond.wait(timeout)
615            return self.__flag
616
617# Helper to generate new thread names
618_counter = _count().next
619_counter() # Consume 0 so first non-main thread has id 1.
620def _newname(template="Thread-%d"):
621    return template % _counter()
622
623# Active thread administration
624_active_limbo_lock = _allocate_lock()
625_active = {}    # maps thread id to Thread object
626_limbo = {}
627
628
629# Main class for threads
630
631class Thread(_Verbose):
632    """A class that represents a thread of control.
633
634    This class can be safely subclassed in a limited fashion.
635
636    """
637    __initialized = False
638    # Need to store a reference to sys.exc_info for printing
639    # out exceptions when a thread tries to use a global var. during interp.
640    # shutdown and thus raises an exception about trying to perform some
641    # operation on/with a NoneType
642    __exc_info = _sys.exc_info
643    # Keep sys.exc_clear too to clear the exception just before
644    # allowing .join() to return.
645    __exc_clear = _sys.exc_clear
646
647    def __init__(self, group=None, target=None, name=None,
648                 args=(), kwargs=None, verbose=None):
649        """This constructor should always be called with keyword arguments. Arguments are:
650
651        *group* should be None; reserved for future extension when a ThreadGroup
652        class is implemented.
653
654        *target* is the callable object to be invoked by the run()
655        method. Defaults to None, meaning nothing is called.
656
657        *name* is the thread name. By default, a unique name is constructed of
658        the form "Thread-N" where N is a small decimal number.
659
660        *args* is the argument tuple for the target invocation. Defaults to ().
661
662        *kwargs* is a dictionary of keyword arguments for the target
663        invocation. Defaults to {}.
664
665        If a subclass overrides the constructor, it must make sure to invoke
666        the base class constructor (Thread.__init__()) before doing anything
667        else to the thread.
668
669"""
670        assert group is None, "group argument must be None for now"
671        _Verbose.__init__(self, verbose)
672        if kwargs is None:
673            kwargs = {}
674        self.__target = target
675        self.__name = str(name or _newname())
676        self.__args = args
677        self.__kwargs = kwargs
678        self.__daemonic = self._set_daemon()
679        self.__ident = None
680        self.__started = Event()
681        self.__stopped = False
682        self.__block = Condition(Lock())
683        self.__initialized = True
684        # sys.stderr is not stored in the class like
685        # sys.exc_info since it can be changed between instances
686        self.__stderr = _sys.stderr
687
688    def _reset_internal_locks(self):
689        # private!  Called by _after_fork() to reset our internal locks as
690        # they may be in an invalid state leading to a deadlock or crash.
691        if hasattr(self, '_Thread__block'):  # DummyThread deletes self.__block
692            self.__block.__init__()
693        self.__started._reset_internal_locks()
694
695    @property
696    def _block(self):
697        # used by a unittest
698        return self.__block
699
700    def _set_daemon(self):
701        # Overridden in _MainThread and _DummyThread
702        return current_thread().daemon
703
704    def __repr__(self):
705        assert self.__initialized, "Thread.__init__() was not called"
706        status = "initial"
707        if self.__started.is_set():
708            status = "started"
709        if self.__stopped:
710            status = "stopped"
711        if self.__daemonic:
712            status += " daemon"
713        if self.__ident is not None:
714            status += " %s" % self.__ident
715        return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
716
717    def start(self):
718        """Start the thread's activity.
719
720        It must be called at most once per thread object. It arranges for the
721        object's run() method to be invoked in a separate thread of control.
722
723        This method will raise a RuntimeError if called more than once on the
724        same thread object.
725
726        """
727        if not self.__initialized:
728            raise RuntimeError("thread.__init__() not called")
729        if self.__started.is_set():
730            raise RuntimeError("threads can only be started once")
731        if __debug__:
732            self._note("%s.start(): starting thread", self)
733        with _active_limbo_lock:
734            _limbo[self] = self
735        try:
736            _start_new_thread(self.__bootstrap, ())
737        except Exception:
738            with _active_limbo_lock:
739                del _limbo[self]
740            raise
741        self.__started.wait()
742
743    def run(self):
744        """Method representing the thread's activity.
745
746        You may override this method in a subclass. The standard run() method
747        invokes the callable object passed to the object's constructor as the
748        target argument, if any, with sequential and keyword arguments taken
749        from the args and kwargs arguments, respectively.
750
751        """
752        try:
753            if self.__target:
754                self.__target(*self.__args, **self.__kwargs)
755        finally:
756            # Avoid a refcycle if the thread is running a function with
757            # an argument that has a member that points to the thread.
758            del self.__target, self.__args, self.__kwargs
759
760    def __bootstrap(self):
761        # Wrapper around the real bootstrap code that ignores
762        # exceptions during interpreter cleanup.  Those typically
763        # happen when a daemon thread wakes up at an unfortunate
764        # moment, finds the world around it destroyed, and raises some
765        # random exception *** while trying to report the exception in
766        # __bootstrap_inner() below ***.  Those random exceptions
767        # don't help anybody, and they confuse users, so we suppress
768        # them.  We suppress them only when it appears that the world
769        # indeed has already been destroyed, so that exceptions in
770        # __bootstrap_inner() during normal business hours are properly
771        # reported.  Also, we only suppress them for daemonic threads;
772        # if a non-daemonic encounters this, something else is wrong.
773        try:
774            self.__bootstrap_inner()
775        except:
776            if self.__daemonic and _sys is None:
777                return
778            raise
779
780    def _set_ident(self):
781        self.__ident = _get_ident()
782
783    def __bootstrap_inner(self):
784        try:
785            self._set_ident()
786            self.__started.set()
787            with _active_limbo_lock:
788                _active[self.__ident] = self
789                del _limbo[self]
790            if __debug__:
791                self._note("%s.__bootstrap(): thread started", self)
792
793            if _trace_hook:
794                self._note("%s.__bootstrap(): registering trace hook", self)
795                _sys.settrace(_trace_hook)
796            if _profile_hook:
797                self._note("%s.__bootstrap(): registering profile hook", self)
798                _sys.setprofile(_profile_hook)
799
800            try:
801                self.run()
802            except SystemExit:
803                if __debug__:
804                    self._note("%s.__bootstrap(): raised SystemExit", self)
805            except:
806                if __debug__:
807                    self._note("%s.__bootstrap(): unhandled exception", self)
808                # If sys.stderr is no more (most likely from interpreter
809                # shutdown) use self.__stderr.  Otherwise still use sys (as in
810                # _sys) in case sys.stderr was redefined since the creation of
811                # self.
812                if _sys and _sys.stderr is not None:
813                    print>>_sys.stderr, ("Exception in thread %s:\n%s" %
814                                         (self.name, _format_exc()))
815                elif self.__stderr is not None:
816                    # Do the best job possible w/o a huge amt. of code to
817                    # approximate a traceback (code ideas from
818                    # Lib/traceback.py)
819                    exc_type, exc_value, exc_tb = self.__exc_info()
820                    try:
821                        print>>self.__stderr, (
822                            "Exception in thread " + self.name +
823                            " (most likely raised during interpreter shutdown):")
824                        print>>self.__stderr, (
825                            "Traceback (most recent call last):")
826                        while exc_tb:
827                            print>>self.__stderr, (
828                                '  File "%s", line %s, in %s' %
829                                (exc_tb.tb_frame.f_code.co_filename,
830                                    exc_tb.tb_lineno,
831                                    exc_tb.tb_frame.f_code.co_name))
832                            exc_tb = exc_tb.tb_next
833                        print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
834                    # Make sure that exc_tb gets deleted since it is a memory
835                    # hog; deleting everything else is just for thoroughness
836                    finally:
837                        del exc_type, exc_value, exc_tb
838            else:
839                if __debug__:
840                    self._note("%s.__bootstrap(): normal return", self)
841            finally:
842                # Prevent a race in
843                # test_threading.test_no_refcycle_through_target when
844                # the exception keeps the target alive past when we
845                # assert that it's dead.
846                self.__exc_clear()
847        finally:
848            with _active_limbo_lock:
849                self.__stop()
850                try:
851                    # We don't call self.__delete() because it also
852                    # grabs _active_limbo_lock.
853                    del _active[_get_ident()]
854                except:
855                    pass
856
857    def __stop(self):
858        # DummyThreads delete self.__block, but they have no waiters to
859        # notify anyway (join() is forbidden on them).
860        if not hasattr(self, '_Thread__block'):
861            return
862        self.__block.acquire()
863        self.__stopped = True
864        self.__block.notify_all()
865        self.__block.release()
866
867    def __delete(self):
868        "Remove current thread from the dict of currently running threads."
869
870        # Notes about running with dummy_thread:
871        #
872        # Must take care to not raise an exception if dummy_thread is being
873        # used (and thus this module is being used as an instance of
874        # dummy_threading).  dummy_thread.get_ident() always returns -1 since
875        # there is only one thread if dummy_thread is being used.  Thus
876        # len(_active) is always <= 1 here, and any Thread instance created
877        # overwrites the (if any) thread currently registered in _active.
878        #
879        # An instance of _MainThread is always created by 'threading'.  This
880        # gets overwritten the instant an instance of Thread is created; both
881        # threads return -1 from dummy_thread.get_ident() and thus have the
882        # same key in the dict.  So when the _MainThread instance created by
883        # 'threading' tries to clean itself up when atexit calls this method
884        # it gets a KeyError if another Thread instance was created.
885        #
886        # This all means that KeyError from trying to delete something from
887        # _active if dummy_threading is being used is a red herring.  But
888        # since it isn't if dummy_threading is *not* being used then don't
889        # hide the exception.
890
891        try:
892            with _active_limbo_lock:
893                del _active[_get_ident()]
894                # There must not be any python code between the previous line
895                # and after the lock is released.  Otherwise a tracing function
896                # could try to acquire the lock again in the same thread, (in
897                # current_thread()), and would block.
898        except KeyError:
899            if 'dummy_threading' not in _sys.modules:
900                raise
901
902    def join(self, timeout=None):
903        """Wait until the thread terminates.
904
905        This blocks the calling thread until the thread whose join() method is
906        called terminates -- either normally or through an unhandled exception
907        or until the optional timeout occurs.
908
909        When the timeout argument is present and not None, it should be a
910        floating point number specifying a timeout for the operation in seconds
911        (or fractions thereof). As join() always returns None, you must call
912        isAlive() after join() to decide whether a timeout happened -- if the
913        thread is still alive, the join() call timed out.
914
915        When the timeout argument is not present or None, the operation will
916        block until the thread terminates.
917
918        A thread can be join()ed many times.
919
920        join() raises a RuntimeError if an attempt is made to join the current
921        thread as that would cause a deadlock. It is also an error to join() a
922        thread before it has been started and attempts to do so raises the same
923        exception.
924
925        """
926        if not self.__initialized:
927            raise RuntimeError("Thread.__init__() not called")
928        if not self.__started.is_set():
929            raise RuntimeError("cannot join thread before it is started")
930        if self is current_thread():
931            raise RuntimeError("cannot join current thread")
932
933        if __debug__:
934            if not self.__stopped:
935                self._note("%s.join(): waiting until thread stops", self)
936        self.__block.acquire()
937        try:
938            if timeout is None:
939                while not self.__stopped:
940                    self.__block.wait()
941                if __debug__:
942                    self._note("%s.join(): thread stopped", self)
943            else:
944                deadline = _time() + timeout
945                while not self.__stopped:
946                    delay = deadline - _time()
947                    if delay <= 0:
948                        if __debug__:
949                            self._note("%s.join(): timed out", self)
950                        break
951                    self.__block.wait(delay)
952                else:
953                    if __debug__:
954                        self._note("%s.join(): thread stopped", self)
955        finally:
956            self.__block.release()
957
958    @property
959    def name(self):
960        """A string used for identification purposes only.
961
962        It has no semantics. Multiple threads may be given the same name. The
963        initial name is set by the constructor.
964
965        """
966        assert self.__initialized, "Thread.__init__() not called"
967        return self.__name
968
969    @name.setter
970    def name(self, name):
971        assert self.__initialized, "Thread.__init__() not called"
972        self.__name = str(name)
973
974    @property
975    def ident(self):
976        """Thread identifier of this thread or None if it has not been started.
977
978        This is a nonzero integer. See the thread.get_ident() function. Thread
979        identifiers may be recycled when a thread exits and another thread is
980        created. The identifier is available even after the thread has exited.
981
982        """
983        assert self.__initialized, "Thread.__init__() not called"
984        return self.__ident
985
986    def isAlive(self):
987        """Return whether the thread is alive.
988
989        This method returns True just before the run() method starts until just
990        after the run() method terminates. The module function enumerate()
991        returns a list of all alive threads.
992
993        """
994        assert self.__initialized, "Thread.__init__() not called"
995        return self.__started.is_set() and not self.__stopped
996
997    is_alive = isAlive
998
999    @property
1000    def daemon(self):
1001        """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
1002
1003        This must be set before start() is called, otherwise RuntimeError is
1004        raised. Its initial value is inherited from the creating thread; the
1005        main thread is not a daemon thread and therefore all threads created in
1006        the main thread default to daemon = False.
1007
1008        The entire Python program exits when no alive non-daemon threads are
1009        left.
1010
1011        """
1012        assert self.__initialized, "Thread.__init__() not called"
1013        return self.__daemonic
1014
1015    @daemon.setter
1016    def daemon(self, daemonic):
1017        if not self.__initialized:
1018            raise RuntimeError("Thread.__init__() not called")
1019        if self.__started.is_set():
1020            raise RuntimeError("cannot set daemon status of active thread");
1021        self.__daemonic = daemonic
1022
1023    def isDaemon(self):
1024        return self.daemon
1025
1026    def setDaemon(self, daemonic):
1027        self.daemon = daemonic
1028
1029    def getName(self):
1030        return self.name
1031
1032    def setName(self, name):
1033        self.name = name
1034
1035# The timer class was contributed by Itamar Shtull-Trauring
1036
1037def Timer(*args, **kwargs):
1038    """Factory function to create a Timer object.
1039
1040    Timers call a function after a specified number of seconds:
1041
1042        t = Timer(30.0, f, args=[], kwargs={})
1043        t.start()
1044        t.cancel()     # stop the timer's action if it's still waiting
1045
1046    """
1047    return _Timer(*args, **kwargs)
1048
1049class _Timer(Thread):
1050    """Call a function after a specified number of seconds:
1051
1052            t = Timer(30.0, f, args=[], kwargs={})
1053            t.start()
1054            t.cancel()     # stop the timer's action if it's still waiting
1055
1056    """
1057
1058    def __init__(self, interval, function, args=[], kwargs={}):
1059        Thread.__init__(self)
1060        self.interval = interval
1061        self.function = function
1062        self.args = args
1063        self.kwargs = kwargs
1064        self.finished = Event()
1065
1066    def cancel(self):
1067        """Stop the timer if it hasn't finished yet"""
1068        self.finished.set()
1069
1070    def run(self):
1071        self.finished.wait(self.interval)
1072        if not self.finished.is_set():
1073            self.function(*self.args, **self.kwargs)
1074        self.finished.set()
1075
1076# Special thread class to represent the main thread
1077# This is garbage collected through an exit handler
1078
1079class _MainThread(Thread):
1080
1081    def __init__(self):
1082        Thread.__init__(self, name="MainThread")
1083        self._Thread__started.set()
1084        self._set_ident()
1085        with _active_limbo_lock:
1086            _active[_get_ident()] = self
1087
1088    def _set_daemon(self):
1089        return False
1090
1091    def _exitfunc(self):
1092        self._Thread__stop()
1093        t = _pickSomeNonDaemonThread()
1094        if t:
1095            if __debug__:
1096                self._note("%s: waiting for other threads", self)
1097        while t:
1098            t.join()
1099            t = _pickSomeNonDaemonThread()
1100        if __debug__:
1101            self._note("%s: exiting", self)
1102        self._Thread__delete()
1103
1104def _pickSomeNonDaemonThread():
1105    for t in enumerate():
1106        if not t.daemon and t.is_alive():
1107            return t
1108    return None
1109
1110
1111# Dummy thread class to represent threads not started here.
1112# These aren't garbage collected when they die, nor can they be waited for.
1113# If they invoke anything in threading.py that calls current_thread(), they
1114# leave an entry in the _active dict forever after.
1115# Their purpose is to return *something* from current_thread().
1116# They are marked as daemon threads so we won't wait for them
1117# when we exit (conform previous semantics).
1118
1119class _DummyThread(Thread):
1120
1121    def __init__(self):
1122        Thread.__init__(self, name=_newname("Dummy-%d"))
1123
1124        # Thread.__block consumes an OS-level locking primitive, which
1125        # can never be used by a _DummyThread.  Since a _DummyThread
1126        # instance is immortal, that's bad, so release this resource.
1127        del self._Thread__block
1128
1129        self._Thread__started.set()
1130        self._set_ident()
1131        with _active_limbo_lock:
1132            _active[_get_ident()] = self
1133
1134    def _set_daemon(self):
1135        return True
1136
1137    def join(self, timeout=None):
1138        assert False, "cannot join a dummy thread"
1139
1140
1141# Global API functions
1142
1143def currentThread():
1144    """Return the current Thread object, corresponding to the caller's thread of control.
1145
1146    If the caller's thread of control was not created through the threading
1147    module, a dummy thread object with limited functionality is returned.
1148
1149    """
1150    try:
1151        return _active[_get_ident()]
1152    except KeyError:
1153        ##print "current_thread(): no current thread for", _get_ident()
1154        return _DummyThread()
1155
1156current_thread = currentThread
1157
1158def activeCount():
1159    """Return the number of Thread objects currently alive.
1160
1161    The returned count is equal to the length of the list returned by
1162    enumerate().
1163
1164    """
1165    with _active_limbo_lock:
1166        return len(_active) + len(_limbo)
1167
1168active_count = activeCount
1169
1170def _enumerate():
1171    # Same as enumerate(), but without the lock. Internal use only.
1172    return _active.values() + _limbo.values()
1173
1174def enumerate():
1175    """Return a list of all Thread objects currently alive.
1176
1177    The list includes daemonic threads, dummy thread objects created by
1178    current_thread(), and the main thread. It excludes terminated threads and
1179    threads that have not yet been started.
1180
1181    """
1182    with _active_limbo_lock:
1183        return _active.values() + _limbo.values()
1184
1185from thread import stack_size
1186
1187# Create the main thread object,
1188# and make it available for the interpreter
1189# (Py_Main) as threading._shutdown.
1190
1191_shutdown = _MainThread()._exitfunc
1192
1193# get thread-local implementation, either from the thread
1194# module, or from the python fallback
1195
1196try:
1197    from thread import _local as local
1198except ImportError:
1199    from _threading_local import local
1200
1201
1202def _after_fork():
1203    # This function is called by Python/ceval.c:PyEval_ReInitThreads which
1204    # is called from PyOS_AfterFork.  Here we cleanup threading module state
1205    # that should not exist after a fork.
1206
1207    # Reset _active_limbo_lock, in case we forked while the lock was held
1208    # by another (non-forked) thread.  http://bugs.python.org/issue874900
1209    global _active_limbo_lock
1210    _active_limbo_lock = _allocate_lock()
1211
1212    # fork() only copied the current thread; clear references to others.
1213    new_active = {}
1214    current = current_thread()
1215    with _active_limbo_lock:
1216        for thread in _enumerate():
1217            # Any lock/condition variable may be currently locked or in an
1218            # invalid state, so we reinitialize them.
1219            if hasattr(thread, '_reset_internal_locks'):
1220                thread._reset_internal_locks()
1221            if thread is current:
1222                # There is only one active thread. We reset the ident to
1223                # its new value since it can have changed.
1224                ident = _get_ident()
1225                thread._Thread__ident = ident
1226                new_active[ident] = thread
1227            else:
1228                # All the others are already stopped.
1229                thread._Thread__stop()
1230
1231        _limbo.clear()
1232        _active.clear()
1233        _active.update(new_active)
1234        assert len(_active) == 1
1235
1236
1237# Self-test code
1238
1239def _test():
1240
1241    class BoundedQueue(_Verbose):
1242
1243        def __init__(self, limit):
1244            _Verbose.__init__(self)
1245            self.mon = RLock()
1246            self.rc = Condition(self.mon)
1247            self.wc = Condition(self.mon)
1248            self.limit = limit
1249            self.queue = _deque()
1250
1251        def put(self, item):
1252            self.mon.acquire()
1253            while len(self.queue) >= self.limit:
1254                self._note("put(%s): queue full", item)
1255                self.wc.wait()
1256            self.queue.append(item)
1257            self._note("put(%s): appended, length now %d",
1258                       item, len(self.queue))
1259            self.rc.notify()
1260            self.mon.release()
1261
1262        def get(self):
1263            self.mon.acquire()
1264            while not self.queue:
1265                self._note("get(): queue empty")
1266                self.rc.wait()
1267            item = self.queue.popleft()
1268            self._note("get(): got %s, %d left", item, len(self.queue))
1269            self.wc.notify()
1270            self.mon.release()
1271            return item
1272
1273    class ProducerThread(Thread):
1274
1275        def __init__(self, queue, quota):
1276            Thread.__init__(self, name="Producer")
1277            self.queue = queue
1278            self.quota = quota
1279
1280        def run(self):
1281            from random import random
1282            counter = 0
1283            while counter < self.quota:
1284                counter = counter + 1
1285                self.queue.put("%s.%d" % (self.name, counter))
1286                _sleep(random() * 0.00001)
1287
1288
1289    class ConsumerThread(Thread):
1290
1291        def __init__(self, queue, count):
1292            Thread.__init__(self, name="Consumer")
1293            self.queue = queue
1294            self.count = count
1295
1296        def run(self):
1297            while self.count > 0:
1298                item = self.queue.get()
1299                print item
1300                self.count = self.count - 1
1301
1302    NP = 3
1303    QL = 4
1304    NI = 5
1305
1306    Q = BoundedQueue(QL)
1307    P = []
1308    for i in range(NP):
1309        t = ProducerThread(Q, NI)
1310        t.name = ("Producer-%d" % (i+1))
1311        P.append(t)
1312    C = ConsumerThread(Q, NI*NP)
1313    for t in P:
1314        t.start()
1315        _sleep(0.000001)
1316    C.start()
1317    for t in P:
1318        t.join()
1319    C.join()
1320
1321if __name__ == '__main__':
1322    _test()
1323