1"""Thread module emulating a subset of Java's threading model.""" 2 3import os as _os 4import sys as _sys 5import _thread 6import warnings 7 8from time import monotonic as _time 9from _weakrefset import WeakSet 10from itertools import count as _count 11try: 12 from _collections import deque as _deque 13except ImportError: 14 from collections import deque as _deque 15 16# Note regarding PEP 8 compliant names 17# This threading model was originally inspired by Java, and inherited 18# the convention of camelCase function and method names from that 19# language. Those original names are not in any imminent danger of 20# being deprecated (even for Py3k),so this module provides them as an 21# alias for the PEP 8 compliant names 22# Note that using the new PEP 8 compliant names facilitates substitution 23# with the multiprocessing module, which doesn't provide the old 24# Java inspired names. 25 26__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 27 'enumerate', 'main_thread', 'TIMEOUT_MAX', 28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 30 'setprofile', 'settrace', 'local', 'stack_size', 31 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile', 32 'setprofile_all_threads','settrace_all_threads'] 33 34# Rename some stuff so "from threading import *" is safe 35_start_joinable_thread = _thread.start_joinable_thread 36_daemon_threads_allowed = _thread.daemon_threads_allowed 37_allocate_lock = _thread.allocate_lock 38_LockType = _thread.LockType 39_thread_shutdown = _thread._shutdown 40_make_thread_handle = _thread._make_thread_handle 41_ThreadHandle = _thread._ThreadHandle 42get_ident = _thread.get_ident 43_get_main_thread_ident = _thread._get_main_thread_ident 44_is_main_interpreter = _thread._is_main_interpreter 45try: 46 get_native_id = _thread.get_native_id 47 _HAVE_THREAD_NATIVE_ID = True 48 __all__.append('get_native_id') 49except AttributeError: 50 _HAVE_THREAD_NATIVE_ID = False 51ThreadError = _thread.error 52try: 53 _CRLock = _thread.RLock 54except AttributeError: 55 _CRLock = None 56TIMEOUT_MAX = _thread.TIMEOUT_MAX 57del _thread 58 59# get thread-local implementation, either from the thread 60# module, or from the python fallback 61 62try: 63 from _thread import _local as local 64except ImportError: 65 from _threading_local import local 66 67# Support for profile and trace hooks 68 69_profile_hook = None 70_trace_hook = None 71 72def setprofile(func): 73 """Set a profile function for all threads started from the threading module. 74 75 The func will be passed to sys.setprofile() for each thread, before its 76 run() method is called. 77 """ 78 global _profile_hook 79 _profile_hook = func 80 81def setprofile_all_threads(func): 82 """Set a profile function for all threads started from the threading module 83 and all Python threads that are currently executing. 84 85 The func will be passed to sys.setprofile() for each thread, before its 86 run() method is called. 87 """ 88 setprofile(func) 89 _sys._setprofileallthreads(func) 90 91def getprofile(): 92 """Get the profiler function as set by threading.setprofile().""" 93 return _profile_hook 94 95def settrace(func): 96 """Set a trace function for all threads started from the threading module. 97 98 The func will be passed to sys.settrace() for each thread, before its run() 99 method is called. 100 """ 101 global _trace_hook 102 _trace_hook = func 103 104def settrace_all_threads(func): 105 """Set a trace function for all threads started from the threading module 106 and all Python threads that are currently executing. 107 108 The func will be passed to sys.settrace() for each thread, before its run() 109 method is called. 110 """ 111 settrace(func) 112 _sys._settraceallthreads(func) 113 114def gettrace(): 115 """Get the trace function as set by threading.settrace().""" 116 return _trace_hook 117 118# Synchronization classes 119 120Lock = _LockType 121 122def RLock(*args, **kwargs): 123 """Factory function that returns a new reentrant lock. 124 125 A reentrant lock must be released by the thread that acquired it. Once a 126 thread has acquired a reentrant lock, the same thread may acquire it again 127 without blocking; the thread must release it once for each time it has 128 acquired it. 129 130 """ 131 if args or kwargs: 132 warnings.warn( 133 'Passing arguments to RLock is deprecated and will be removed in 3.15', 134 DeprecationWarning, 135 stacklevel=2, 136 ) 137 if _CRLock is None: 138 return _PyRLock(*args, **kwargs) 139 return _CRLock(*args, **kwargs) 140 141class _RLock: 142 """This class implements reentrant lock objects. 143 144 A reentrant lock must be released by the thread that acquired it. Once a 145 thread has acquired a reentrant lock, the same thread may acquire it 146 again without blocking; the thread must release it once for each time it 147 has acquired it. 148 149 """ 150 151 def __init__(self): 152 self._block = _allocate_lock() 153 self._owner = None 154 self._count = 0 155 156 def __repr__(self): 157 owner = self._owner 158 try: 159 owner = _active[owner].name 160 except KeyError: 161 pass 162 return "<%s %s.%s object owner=%r count=%d at %s>" % ( 163 "locked" if self._block.locked() else "unlocked", 164 self.__class__.__module__, 165 self.__class__.__qualname__, 166 owner, 167 self._count, 168 hex(id(self)) 169 ) 170 171 def _at_fork_reinit(self): 172 self._block._at_fork_reinit() 173 self._owner = None 174 self._count = 0 175 176 def acquire(self, blocking=True, timeout=-1): 177 """Acquire a lock, blocking or non-blocking. 178 179 When invoked without arguments: if this thread already owns the lock, 180 increment the recursion level by one, and return immediately. Otherwise, 181 if another thread owns the lock, block until the lock is unlocked. Once 182 the lock is unlocked (not owned by any thread), then grab ownership, set 183 the recursion level to one, and return. If more than one thread is 184 blocked waiting until the lock is unlocked, only one at a time will be 185 able to grab ownership of the lock. There is no return value in this 186 case. 187 188 When invoked with the blocking argument set to true, do the same thing 189 as when called without arguments, and return true. 190 191 When invoked with the blocking argument set to false, do not block. If a 192 call without an argument would block, return false immediately; 193 otherwise, do the same thing as when called without arguments, and 194 return true. 195 196 When invoked with the floating-point timeout argument set to a positive 197 value, block for at most the number of seconds specified by timeout 198 and as long as the lock cannot be acquired. Return true if the lock has 199 been acquired, false if the timeout has elapsed. 200 201 """ 202 me = get_ident() 203 if self._owner == me: 204 self._count += 1 205 return 1 206 rc = self._block.acquire(blocking, timeout) 207 if rc: 208 self._owner = me 209 self._count = 1 210 return rc 211 212 __enter__ = acquire 213 214 def release(self): 215 """Release a lock, decrementing the recursion level. 216 217 If after the decrement it is zero, reset the lock to unlocked (not owned 218 by any thread), and if any other threads are blocked waiting for the 219 lock to become unlocked, allow exactly one of them to proceed. If after 220 the decrement the recursion level is still nonzero, the lock remains 221 locked and owned by the calling thread. 222 223 Only call this method when the calling thread owns the lock. A 224 RuntimeError is raised if this method is called when the lock is 225 unlocked. 226 227 There is no return value. 228 229 """ 230 if self._owner != get_ident(): 231 raise RuntimeError("cannot release un-acquired lock") 232 self._count = count = self._count - 1 233 if not count: 234 self._owner = None 235 self._block.release() 236 237 def __exit__(self, t, v, tb): 238 self.release() 239 240 # Internal methods used by condition variables 241 242 def _acquire_restore(self, state): 243 self._block.acquire() 244 self._count, self._owner = state 245 246 def _release_save(self): 247 if self._count == 0: 248 raise RuntimeError("cannot release un-acquired lock") 249 count = self._count 250 self._count = 0 251 owner = self._owner 252 self._owner = None 253 self._block.release() 254 return (count, owner) 255 256 def _is_owned(self): 257 return self._owner == get_ident() 258 259 # Internal method used for reentrancy checks 260 261 def _recursion_count(self): 262 if self._owner != get_ident(): 263 return 0 264 return self._count 265 266_PyRLock = _RLock 267 268 269class Condition: 270 """Class that implements a condition variable. 271 272 A condition variable allows one or more threads to wait until they are 273 notified by another thread. 274 275 If the lock argument is given and not None, it must be a Lock or RLock 276 object, and it is used as the underlying lock. Otherwise, a new RLock object 277 is created and used as the underlying lock. 278 279 """ 280 281 def __init__(self, lock=None): 282 if lock is None: 283 lock = RLock() 284 self._lock = lock 285 # Export the lock's acquire() and release() methods 286 self.acquire = lock.acquire 287 self.release = lock.release 288 # If the lock defines _release_save() and/or _acquire_restore(), 289 # these override the default implementations (which just call 290 # release() and acquire() on the lock). Ditto for _is_owned(). 291 if hasattr(lock, '_release_save'): 292 self._release_save = lock._release_save 293 if hasattr(lock, '_acquire_restore'): 294 self._acquire_restore = lock._acquire_restore 295 if hasattr(lock, '_is_owned'): 296 self._is_owned = lock._is_owned 297 self._waiters = _deque() 298 299 def _at_fork_reinit(self): 300 self._lock._at_fork_reinit() 301 self._waiters.clear() 302 303 def __enter__(self): 304 return self._lock.__enter__() 305 306 def __exit__(self, *args): 307 return self._lock.__exit__(*args) 308 309 def __repr__(self): 310 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) 311 312 def _release_save(self): 313 self._lock.release() # No state to save 314 315 def _acquire_restore(self, x): 316 self._lock.acquire() # Ignore saved state 317 318 def _is_owned(self): 319 # Return True if lock is owned by current_thread. 320 # This method is called only if _lock doesn't have _is_owned(). 321 if self._lock.acquire(False): 322 self._lock.release() 323 return False 324 else: 325 return True 326 327 def wait(self, timeout=None): 328 """Wait until notified or until a timeout occurs. 329 330 If the calling thread has not acquired the lock when this method is 331 called, a RuntimeError is raised. 332 333 This method releases the underlying lock, and then blocks until it is 334 awakened by a notify() or notify_all() call for the same condition 335 variable in another thread, or until the optional timeout occurs. Once 336 awakened or timed out, it re-acquires the lock and returns. 337 338 When the timeout argument is present and not None, it should be a 339 floating-point number specifying a timeout for the operation in seconds 340 (or fractions thereof). 341 342 When the underlying lock is an RLock, it is not released using its 343 release() method, since this may not actually unlock the lock when it 344 was acquired multiple times recursively. Instead, an internal interface 345 of the RLock class is used, which really unlocks it even when it has 346 been recursively acquired several times. Another internal interface is 347 then used to restore the recursion level when the lock is reacquired. 348 349 """ 350 if not self._is_owned(): 351 raise RuntimeError("cannot wait on un-acquired lock") 352 waiter = _allocate_lock() 353 waiter.acquire() 354 self._waiters.append(waiter) 355 saved_state = self._release_save() 356 gotit = False 357 try: # restore state no matter what (e.g., KeyboardInterrupt) 358 if timeout is None: 359 waiter.acquire() 360 gotit = True 361 else: 362 if timeout > 0: 363 gotit = waiter.acquire(True, timeout) 364 else: 365 gotit = waiter.acquire(False) 366 return gotit 367 finally: 368 self._acquire_restore(saved_state) 369 if not gotit: 370 try: 371 self._waiters.remove(waiter) 372 except ValueError: 373 pass 374 375 def wait_for(self, predicate, timeout=None): 376 """Wait until a condition evaluates to True. 377 378 predicate should be a callable which result will be interpreted as a 379 boolean value. A timeout may be provided giving the maximum time to 380 wait. 381 382 """ 383 endtime = None 384 waittime = timeout 385 result = predicate() 386 while not result: 387 if waittime is not None: 388 if endtime is None: 389 endtime = _time() + waittime 390 else: 391 waittime = endtime - _time() 392 if waittime <= 0: 393 break 394 self.wait(waittime) 395 result = predicate() 396 return result 397 398 def notify(self, n=1): 399 """Wake up one or more threads waiting on this condition, if any. 400 401 If the calling thread has not acquired the lock when this method is 402 called, a RuntimeError is raised. 403 404 This method wakes up at most n of the threads waiting for the condition 405 variable; it is a no-op if no threads are waiting. 406 407 """ 408 if not self._is_owned(): 409 raise RuntimeError("cannot notify on un-acquired lock") 410 waiters = self._waiters 411 while waiters and n > 0: 412 waiter = waiters[0] 413 try: 414 waiter.release() 415 except RuntimeError: 416 # gh-92530: The previous call of notify() released the lock, 417 # but was interrupted before removing it from the queue. 418 # It can happen if a signal handler raises an exception, 419 # like CTRL+C which raises KeyboardInterrupt. 420 pass 421 else: 422 n -= 1 423 try: 424 waiters.remove(waiter) 425 except ValueError: 426 pass 427 428 def notify_all(self): 429 """Wake up all threads waiting on this condition. 430 431 If the calling thread has not acquired the lock when this method 432 is called, a RuntimeError is raised. 433 434 """ 435 self.notify(len(self._waiters)) 436 437 def notifyAll(self): 438 """Wake up all threads waiting on this condition. 439 440 This method is deprecated, use notify_all() instead. 441 442 """ 443 import warnings 444 warnings.warn('notifyAll() is deprecated, use notify_all() instead', 445 DeprecationWarning, stacklevel=2) 446 self.notify_all() 447 448 449class Semaphore: 450 """This class implements semaphore objects. 451 452 Semaphores manage a counter representing the number of release() calls minus 453 the number of acquire() calls, plus an initial value. The acquire() method 454 blocks if necessary until it can return without making the counter 455 negative. If not given, value defaults to 1. 456 457 """ 458 459 # After Tim Peters' semaphore class, but not quite the same (no maximum) 460 461 def __init__(self, value=1): 462 if value < 0: 463 raise ValueError("semaphore initial value must be >= 0") 464 self._cond = Condition(Lock()) 465 self._value = value 466 467 def __repr__(self): 468 cls = self.__class__ 469 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" 470 f" value={self._value}>") 471 472 def acquire(self, blocking=True, timeout=None): 473 """Acquire a semaphore, decrementing the internal counter by one. 474 475 When invoked without arguments: if the internal counter is larger than 476 zero on entry, decrement it by one and return immediately. If it is zero 477 on entry, block, waiting until some other thread has called release() to 478 make it larger than zero. This is done with proper interlocking so that 479 if multiple acquire() calls are blocked, release() will wake exactly one 480 of them up. The implementation may pick one at random, so the order in 481 which blocked threads are awakened should not be relied on. There is no 482 return value in this case. 483 484 When invoked with blocking set to true, do the same thing as when called 485 without arguments, and return true. 486 487 When invoked with blocking set to false, do not block. If a call without 488 an argument would block, return false immediately; otherwise, do the 489 same thing as when called without arguments, and return true. 490 491 When invoked with a timeout other than None, it will block for at 492 most timeout seconds. If acquire does not complete successfully in 493 that interval, return false. Return true otherwise. 494 495 """ 496 if not blocking and timeout is not None: 497 raise ValueError("can't specify timeout for non-blocking acquire") 498 rc = False 499 endtime = None 500 with self._cond: 501 while self._value == 0: 502 if not blocking: 503 break 504 if timeout is not None: 505 if endtime is None: 506 endtime = _time() + timeout 507 else: 508 timeout = endtime - _time() 509 if timeout <= 0: 510 break 511 self._cond.wait(timeout) 512 else: 513 self._value -= 1 514 rc = True 515 return rc 516 517 __enter__ = acquire 518 519 def release(self, n=1): 520 """Release a semaphore, incrementing the internal counter by one or more. 521 522 When the counter is zero on entry and another thread is waiting for it 523 to become larger than zero again, wake up that thread. 524 525 """ 526 if n < 1: 527 raise ValueError('n must be one or more') 528 with self._cond: 529 self._value += n 530 self._cond.notify(n) 531 532 def __exit__(self, t, v, tb): 533 self.release() 534 535 536class BoundedSemaphore(Semaphore): 537 """Implements a bounded semaphore. 538 539 A bounded semaphore checks to make sure its current value doesn't exceed its 540 initial value. If it does, ValueError is raised. In most situations 541 semaphores are used to guard resources with limited capacity. 542 543 If the semaphore is released too many times it's a sign of a bug. If not 544 given, value defaults to 1. 545 546 Like regular semaphores, bounded semaphores manage a counter representing 547 the number of release() calls minus the number of acquire() calls, plus an 548 initial value. The acquire() method blocks if necessary until it can return 549 without making the counter negative. If not given, value defaults to 1. 550 551 """ 552 553 def __init__(self, value=1): 554 super().__init__(value) 555 self._initial_value = value 556 557 def __repr__(self): 558 cls = self.__class__ 559 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" 560 f" value={self._value}/{self._initial_value}>") 561 562 def release(self, n=1): 563 """Release a semaphore, incrementing the internal counter by one or more. 564 565 When the counter is zero on entry and another thread is waiting for it 566 to become larger than zero again, wake up that thread. 567 568 If the number of releases exceeds the number of acquires, 569 raise a ValueError. 570 571 """ 572 if n < 1: 573 raise ValueError('n must be one or more') 574 with self._cond: 575 if self._value + n > self._initial_value: 576 raise ValueError("Semaphore released too many times") 577 self._value += n 578 self._cond.notify(n) 579 580 581class Event: 582 """Class implementing event objects. 583 584 Events manage a flag that can be set to true with the set() method and reset 585 to false with the clear() method. The wait() method blocks until the flag is 586 true. The flag is initially false. 587 588 """ 589 590 # After Tim Peters' event class (without is_posted()) 591 592 def __init__(self): 593 self._cond = Condition(Lock()) 594 self._flag = False 595 596 def __repr__(self): 597 cls = self.__class__ 598 status = 'set' if self._flag else 'unset' 599 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" 600 601 def _at_fork_reinit(self): 602 # Private method called by Thread._after_fork() 603 self._cond._at_fork_reinit() 604 605 def is_set(self): 606 """Return true if and only if the internal flag is true.""" 607 return self._flag 608 609 def isSet(self): 610 """Return true if and only if the internal flag is true. 611 612 This method is deprecated, use is_set() instead. 613 614 """ 615 import warnings 616 warnings.warn('isSet() is deprecated, use is_set() instead', 617 DeprecationWarning, stacklevel=2) 618 return self.is_set() 619 620 def set(self): 621 """Set the internal flag to true. 622 623 All threads waiting for it to become true are awakened. Threads 624 that call wait() once the flag is true will not block at all. 625 626 """ 627 with self._cond: 628 self._flag = True 629 self._cond.notify_all() 630 631 def clear(self): 632 """Reset the internal flag to false. 633 634 Subsequently, threads calling wait() will block until set() is called to 635 set the internal flag to true again. 636 637 """ 638 with self._cond: 639 self._flag = False 640 641 def wait(self, timeout=None): 642 """Block until the internal flag is true. 643 644 If the internal flag is true on entry, return immediately. Otherwise, 645 block until another thread calls set() to set the flag to true, or until 646 the optional timeout occurs. 647 648 When the timeout argument is present and not None, it should be a 649 floating-point number specifying a timeout for the operation in seconds 650 (or fractions thereof). 651 652 This method returns the internal flag on exit, so it will always return 653 True except if a timeout is given and the operation times out. 654 655 """ 656 with self._cond: 657 signaled = self._flag 658 if not signaled: 659 signaled = self._cond.wait(timeout) 660 return signaled 661 662 663# A barrier class. Inspired in part by the pthread_barrier_* api and 664# the CyclicBarrier class from Java. See 665# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and 666# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ 667# CyclicBarrier.html 668# for information. 669# We maintain two main states, 'filling' and 'draining' enabling the barrier 670# to be cyclic. Threads are not allowed into it until it has fully drained 671# since the previous cycle. In addition, a 'resetting' state exists which is 672# similar to 'draining' except that threads leave with a BrokenBarrierError, 673# and a 'broken' state in which all threads get the exception. 674class Barrier: 675 """Implements a Barrier. 676 677 Useful for synchronizing a fixed number of threads at known synchronization 678 points. Threads block on 'wait()' and are simultaneously awoken once they 679 have all made that call. 680 681 """ 682 683 def __init__(self, parties, action=None, timeout=None): 684 """Create a barrier, initialised to 'parties' threads. 685 686 'action' is a callable which, when supplied, will be called by one of 687 the threads after they have all entered the barrier and just prior to 688 releasing them all. If a 'timeout' is provided, it is used as the 689 default for all subsequent 'wait()' calls. 690 691 """ 692 if parties < 1: 693 raise ValueError("parties must be > 0") 694 self._cond = Condition(Lock()) 695 self._action = action 696 self._timeout = timeout 697 self._parties = parties 698 self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken 699 self._count = 0 700 701 def __repr__(self): 702 cls = self.__class__ 703 if self.broken: 704 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: broken>" 705 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" 706 f" waiters={self.n_waiting}/{self.parties}>") 707 708 def wait(self, timeout=None): 709 """Wait for the barrier. 710 711 When the specified number of threads have started waiting, they are all 712 simultaneously awoken. If an 'action' was provided for the barrier, one 713 of the threads will have executed that callback prior to returning. 714 Returns an individual index number from 0 to 'parties-1'. 715 716 """ 717 if timeout is None: 718 timeout = self._timeout 719 with self._cond: 720 self._enter() # Block while the barrier drains. 721 index = self._count 722 self._count += 1 723 try: 724 if index + 1 == self._parties: 725 # We release the barrier 726 self._release() 727 else: 728 # We wait until someone releases us 729 self._wait(timeout) 730 return index 731 finally: 732 self._count -= 1 733 # Wake up any threads waiting for barrier to drain. 734 self._exit() 735 736 # Block until the barrier is ready for us, or raise an exception 737 # if it is broken. 738 def _enter(self): 739 while self._state in (-1, 1): 740 # It is draining or resetting, wait until done 741 self._cond.wait() 742 #see if the barrier is in a broken state 743 if self._state < 0: 744 raise BrokenBarrierError 745 assert self._state == 0 746 747 # Optionally run the 'action' and release the threads waiting 748 # in the barrier. 749 def _release(self): 750 try: 751 if self._action: 752 self._action() 753 # enter draining state 754 self._state = 1 755 self._cond.notify_all() 756 except: 757 #an exception during the _action handler. Break and reraise 758 self._break() 759 raise 760 761 # Wait in the barrier until we are released. Raise an exception 762 # if the barrier is reset or broken. 763 def _wait(self, timeout): 764 if not self._cond.wait_for(lambda : self._state != 0, timeout): 765 #timed out. Break the barrier 766 self._break() 767 raise BrokenBarrierError 768 if self._state < 0: 769 raise BrokenBarrierError 770 assert self._state == 1 771 772 # If we are the last thread to exit the barrier, signal any threads 773 # waiting for the barrier to drain. 774 def _exit(self): 775 if self._count == 0: 776 if self._state in (-1, 1): 777 #resetting or draining 778 self._state = 0 779 self._cond.notify_all() 780 781 def reset(self): 782 """Reset the barrier to the initial state. 783 784 Any threads currently waiting will get the BrokenBarrier exception 785 raised. 786 787 """ 788 with self._cond: 789 if self._count > 0: 790 if self._state == 0: 791 #reset the barrier, waking up threads 792 self._state = -1 793 elif self._state == -2: 794 #was broken, set it to reset state 795 #which clears when the last thread exits 796 self._state = -1 797 else: 798 self._state = 0 799 self._cond.notify_all() 800 801 def abort(self): 802 """Place the barrier into a 'broken' state. 803 804 Useful in case of error. Any currently waiting threads and threads 805 attempting to 'wait()' will have BrokenBarrierError raised. 806 807 """ 808 with self._cond: 809 self._break() 810 811 def _break(self): 812 # An internal error was detected. The barrier is set to 813 # a broken state all parties awakened. 814 self._state = -2 815 self._cond.notify_all() 816 817 @property 818 def parties(self): 819 """Return the number of threads required to trip the barrier.""" 820 return self._parties 821 822 @property 823 def n_waiting(self): 824 """Return the number of threads currently waiting at the barrier.""" 825 # We don't need synchronization here since this is an ephemeral result 826 # anyway. It returns the correct value in the steady state. 827 if self._state == 0: 828 return self._count 829 return 0 830 831 @property 832 def broken(self): 833 """Return True if the barrier is in a broken state.""" 834 return self._state == -2 835 836# exception raised by the Barrier class 837class BrokenBarrierError(RuntimeError): 838 pass 839 840 841# Helper to generate new thread names 842_counter = _count(1).__next__ 843def _newname(name_template): 844 return name_template % _counter() 845 846# Active thread administration. 847# 848# bpo-44422: Use a reentrant lock to allow reentrant calls to functions like 849# threading.enumerate(). 850_active_limbo_lock = RLock() 851_active = {} # maps thread id to Thread object 852_limbo = {} 853_dangling = WeakSet() 854 855 856# Main class for threads 857 858class Thread: 859 """A class that represents a thread of control. 860 861 This class can be safely subclassed in a limited fashion. There are two ways 862 to specify the activity: by passing a callable object to the constructor, or 863 by overriding the run() method in a subclass. 864 865 """ 866 867 _initialized = False 868 869 def __init__(self, group=None, target=None, name=None, 870 args=(), kwargs=None, *, daemon=None): 871 """This constructor should always be called with keyword arguments. Arguments are: 872 873 *group* should be None; reserved for future extension when a ThreadGroup 874 class is implemented. 875 876 *target* is the callable object to be invoked by the run() 877 method. Defaults to None, meaning nothing is called. 878 879 *name* is the thread name. By default, a unique name is constructed of 880 the form "Thread-N" where N is a small decimal number. 881 882 *args* is a list or tuple of arguments for the target invocation. Defaults to (). 883 884 *kwargs* is a dictionary of keyword arguments for the target 885 invocation. Defaults to {}. 886 887 If a subclass overrides the constructor, it must make sure to invoke 888 the base class constructor (Thread.__init__()) before doing anything 889 else to the thread. 890 891 """ 892 assert group is None, "group argument must be None for now" 893 if kwargs is None: 894 kwargs = {} 895 if name: 896 name = str(name) 897 else: 898 name = _newname("Thread-%d") 899 if target is not None: 900 try: 901 target_name = target.__name__ 902 name += f" ({target_name})" 903 except AttributeError: 904 pass 905 906 self._target = target 907 self._name = name 908 self._args = args 909 self._kwargs = kwargs 910 if daemon is not None: 911 if daemon and not _daemon_threads_allowed(): 912 raise RuntimeError('daemon threads are disabled in this (sub)interpreter') 913 self._daemonic = daemon 914 else: 915 self._daemonic = current_thread().daemon 916 self._ident = None 917 if _HAVE_THREAD_NATIVE_ID: 918 self._native_id = None 919 self._handle = _ThreadHandle() 920 self._started = Event() 921 self._initialized = True 922 # Copy of sys.stderr used by self._invoke_excepthook() 923 self._stderr = _sys.stderr 924 self._invoke_excepthook = _make_invoke_excepthook() 925 # For debugging and _after_fork() 926 _dangling.add(self) 927 928 def _after_fork(self, new_ident=None): 929 # Private! Called by threading._after_fork(). 930 self._started._at_fork_reinit() 931 if new_ident is not None: 932 # This thread is alive. 933 self._ident = new_ident 934 assert self._handle.ident == new_ident 935 else: 936 # Otherwise, the thread is dead, Jim. _PyThread_AfterFork() 937 # already marked our handle done. 938 pass 939 940 def __repr__(self): 941 assert self._initialized, "Thread.__init__() was not called" 942 status = "initial" 943 if self._started.is_set(): 944 status = "started" 945 if self._handle.is_done(): 946 status = "stopped" 947 if self._daemonic: 948 status += " daemon" 949 if self._ident is not None: 950 status += " %s" % self._ident 951 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) 952 953 def start(self): 954 """Start the thread's activity. 955 956 It must be called at most once per thread object. It arranges for the 957 object's run() method to be invoked in a separate thread of control. 958 959 This method will raise a RuntimeError if called more than once on the 960 same thread object. 961 962 """ 963 if not self._initialized: 964 raise RuntimeError("thread.__init__() not called") 965 966 if self._started.is_set(): 967 raise RuntimeError("threads can only be started once") 968 969 with _active_limbo_lock: 970 _limbo[self] = self 971 try: 972 # Start joinable thread 973 _start_joinable_thread(self._bootstrap, handle=self._handle, 974 daemon=self.daemon) 975 except Exception: 976 with _active_limbo_lock: 977 del _limbo[self] 978 raise 979 self._started.wait() # Will set ident and native_id 980 981 def run(self): 982 """Method representing the thread's activity. 983 984 You may override this method in a subclass. The standard run() method 985 invokes the callable object passed to the object's constructor as the 986 target argument, if any, with sequential and keyword arguments taken 987 from the args and kwargs arguments, respectively. 988 989 """ 990 try: 991 if self._target is not None: 992 self._target(*self._args, **self._kwargs) 993 finally: 994 # Avoid a refcycle if the thread is running a function with 995 # an argument that has a member that points to the thread. 996 del self._target, self._args, self._kwargs 997 998 def _bootstrap(self): 999 # Wrapper around the real bootstrap code that ignores 1000 # exceptions during interpreter cleanup. Those typically 1001 # happen when a daemon thread wakes up at an unfortunate 1002 # moment, finds the world around it destroyed, and raises some 1003 # random exception *** while trying to report the exception in 1004 # _bootstrap_inner() below ***. Those random exceptions 1005 # don't help anybody, and they confuse users, so we suppress 1006 # them. We suppress them only when it appears that the world 1007 # indeed has already been destroyed, so that exceptions in 1008 # _bootstrap_inner() during normal business hours are properly 1009 # reported. Also, we only suppress them for daemonic threads; 1010 # if a non-daemonic encounters this, something else is wrong. 1011 try: 1012 self._bootstrap_inner() 1013 except: 1014 if self._daemonic and _sys is None: 1015 return 1016 raise 1017 1018 def _set_ident(self): 1019 self._ident = get_ident() 1020 1021 if _HAVE_THREAD_NATIVE_ID: 1022 def _set_native_id(self): 1023 self._native_id = get_native_id() 1024 1025 def _bootstrap_inner(self): 1026 try: 1027 self._set_ident() 1028 if _HAVE_THREAD_NATIVE_ID: 1029 self._set_native_id() 1030 self._started.set() 1031 with _active_limbo_lock: 1032 _active[self._ident] = self 1033 del _limbo[self] 1034 1035 if _trace_hook: 1036 _sys.settrace(_trace_hook) 1037 if _profile_hook: 1038 _sys.setprofile(_profile_hook) 1039 1040 try: 1041 self.run() 1042 except: 1043 self._invoke_excepthook(self) 1044 finally: 1045 self._delete() 1046 1047 def _delete(self): 1048 "Remove current thread from the dict of currently running threads." 1049 with _active_limbo_lock: 1050 del _active[get_ident()] 1051 # There must not be any python code between the previous line 1052 # and after the lock is released. Otherwise a tracing function 1053 # could try to acquire the lock again in the same thread, (in 1054 # current_thread()), and would block. 1055 1056 def join(self, timeout=None): 1057 """Wait until the thread terminates. 1058 1059 This blocks the calling thread until the thread whose join() method is 1060 called terminates -- either normally or through an unhandled exception 1061 or until the optional timeout occurs. 1062 1063 When the timeout argument is present and not None, it should be a 1064 floating-point number specifying a timeout for the operation in seconds 1065 (or fractions thereof). As join() always returns None, you must call 1066 is_alive() after join() to decide whether a timeout happened -- if the 1067 thread is still alive, the join() call timed out. 1068 1069 When the timeout argument is not present or None, the operation will 1070 block until the thread terminates. 1071 1072 A thread can be join()ed many times. 1073 1074 join() raises a RuntimeError if an attempt is made to join the current 1075 thread as that would cause a deadlock. It is also an error to join() a 1076 thread before it has been started and attempts to do so raises the same 1077 exception. 1078 1079 """ 1080 if not self._initialized: 1081 raise RuntimeError("Thread.__init__() not called") 1082 if not self._started.is_set(): 1083 raise RuntimeError("cannot join thread before it is started") 1084 if self is current_thread(): 1085 raise RuntimeError("cannot join current thread") 1086 1087 # the behavior of a negative timeout isn't documented, but 1088 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1089 if timeout is not None: 1090 timeout = max(timeout, 0) 1091 1092 self._handle.join(timeout) 1093 1094 @property 1095 def name(self): 1096 """A string used for identification purposes only. 1097 1098 It has no semantics. Multiple threads may be given the same name. The 1099 initial name is set by the constructor. 1100 1101 """ 1102 assert self._initialized, "Thread.__init__() not called" 1103 return self._name 1104 1105 @name.setter 1106 def name(self, name): 1107 assert self._initialized, "Thread.__init__() not called" 1108 self._name = str(name) 1109 1110 @property 1111 def ident(self): 1112 """Thread identifier of this thread or None if it has not been started. 1113 1114 This is a nonzero integer. See the get_ident() function. Thread 1115 identifiers may be recycled when a thread exits and another thread is 1116 created. The identifier is available even after the thread has exited. 1117 1118 """ 1119 assert self._initialized, "Thread.__init__() not called" 1120 return self._ident 1121 1122 if _HAVE_THREAD_NATIVE_ID: 1123 @property 1124 def native_id(self): 1125 """Native integral thread ID of this thread, or None if it has not been started. 1126 1127 This is a non-negative integer. See the get_native_id() function. 1128 This represents the Thread ID as reported by the kernel. 1129 1130 """ 1131 assert self._initialized, "Thread.__init__() not called" 1132 return self._native_id 1133 1134 def is_alive(self): 1135 """Return whether the thread is alive. 1136 1137 This method returns True just before the run() method starts until just 1138 after the run() method terminates. See also the module function 1139 enumerate(). 1140 1141 """ 1142 assert self._initialized, "Thread.__init__() not called" 1143 return self._started.is_set() and not self._handle.is_done() 1144 1145 @property 1146 def daemon(self): 1147 """A boolean value indicating whether this thread is a daemon thread. 1148 1149 This must be set before start() is called, otherwise RuntimeError is 1150 raised. Its initial value is inherited from the creating thread; the 1151 main thread is not a daemon thread and therefore all threads created in 1152 the main thread default to daemon = False. 1153 1154 The entire Python program exits when only daemon threads are left. 1155 1156 """ 1157 assert self._initialized, "Thread.__init__() not called" 1158 return self._daemonic 1159 1160 @daemon.setter 1161 def daemon(self, daemonic): 1162 if not self._initialized: 1163 raise RuntimeError("Thread.__init__() not called") 1164 if daemonic and not _daemon_threads_allowed(): 1165 raise RuntimeError('daemon threads are disabled in this interpreter') 1166 if self._started.is_set(): 1167 raise RuntimeError("cannot set daemon status of active thread") 1168 self._daemonic = daemonic 1169 1170 def isDaemon(self): 1171 """Return whether this thread is a daemon. 1172 1173 This method is deprecated, use the daemon attribute instead. 1174 1175 """ 1176 import warnings 1177 warnings.warn('isDaemon() is deprecated, get the daemon attribute instead', 1178 DeprecationWarning, stacklevel=2) 1179 return self.daemon 1180 1181 def setDaemon(self, daemonic): 1182 """Set whether this thread is a daemon. 1183 1184 This method is deprecated, use the .daemon property instead. 1185 1186 """ 1187 import warnings 1188 warnings.warn('setDaemon() is deprecated, set the daemon attribute instead', 1189 DeprecationWarning, stacklevel=2) 1190 self.daemon = daemonic 1191 1192 def getName(self): 1193 """Return a string used for identification purposes only. 1194 1195 This method is deprecated, use the name attribute instead. 1196 1197 """ 1198 import warnings 1199 warnings.warn('getName() is deprecated, get the name attribute instead', 1200 DeprecationWarning, stacklevel=2) 1201 return self.name 1202 1203 def setName(self, name): 1204 """Set the name string for this thread. 1205 1206 This method is deprecated, use the name attribute instead. 1207 1208 """ 1209 import warnings 1210 warnings.warn('setName() is deprecated, set the name attribute instead', 1211 DeprecationWarning, stacklevel=2) 1212 self.name = name 1213 1214 1215try: 1216 from _thread import (_excepthook as excepthook, 1217 _ExceptHookArgs as ExceptHookArgs) 1218except ImportError: 1219 # Simple Python implementation if _thread._excepthook() is not available 1220 from traceback import print_exception as _print_exception 1221 from collections import namedtuple 1222 1223 _ExceptHookArgs = namedtuple( 1224 'ExceptHookArgs', 1225 'exc_type exc_value exc_traceback thread') 1226 1227 def ExceptHookArgs(args): 1228 return _ExceptHookArgs(*args) 1229 1230 def excepthook(args, /): 1231 """ 1232 Handle uncaught Thread.run() exception. 1233 """ 1234 if args.exc_type == SystemExit: 1235 # silently ignore SystemExit 1236 return 1237 1238 if _sys is not None and _sys.stderr is not None: 1239 stderr = _sys.stderr 1240 elif args.thread is not None: 1241 stderr = args.thread._stderr 1242 if stderr is None: 1243 # do nothing if sys.stderr is None and sys.stderr was None 1244 # when the thread was created 1245 return 1246 else: 1247 # do nothing if sys.stderr is None and args.thread is None 1248 return 1249 1250 if args.thread is not None: 1251 name = args.thread.name 1252 else: 1253 name = get_ident() 1254 print(f"Exception in thread {name}:", 1255 file=stderr, flush=True) 1256 _print_exception(args.exc_type, args.exc_value, args.exc_traceback, 1257 file=stderr) 1258 stderr.flush() 1259 1260 1261# Original value of threading.excepthook 1262__excepthook__ = excepthook 1263 1264 1265def _make_invoke_excepthook(): 1266 # Create a local namespace to ensure that variables remain alive 1267 # when _invoke_excepthook() is called, even if it is called late during 1268 # Python shutdown. It is mostly needed for daemon threads. 1269 1270 old_excepthook = excepthook 1271 old_sys_excepthook = _sys.excepthook 1272 if old_excepthook is None: 1273 raise RuntimeError("threading.excepthook is None") 1274 if old_sys_excepthook is None: 1275 raise RuntimeError("sys.excepthook is None") 1276 1277 sys_exc_info = _sys.exc_info 1278 local_print = print 1279 local_sys = _sys 1280 1281 def invoke_excepthook(thread): 1282 global excepthook 1283 try: 1284 hook = excepthook 1285 if hook is None: 1286 hook = old_excepthook 1287 1288 args = ExceptHookArgs([*sys_exc_info(), thread]) 1289 1290 hook(args) 1291 except Exception as exc: 1292 exc.__suppress_context__ = True 1293 del exc 1294 1295 if local_sys is not None and local_sys.stderr is not None: 1296 stderr = local_sys.stderr 1297 else: 1298 stderr = thread._stderr 1299 1300 local_print("Exception in threading.excepthook:", 1301 file=stderr, flush=True) 1302 1303 if local_sys is not None and local_sys.excepthook is not None: 1304 sys_excepthook = local_sys.excepthook 1305 else: 1306 sys_excepthook = old_sys_excepthook 1307 1308 sys_excepthook(*sys_exc_info()) 1309 finally: 1310 # Break reference cycle (exception stored in a variable) 1311 args = None 1312 1313 return invoke_excepthook 1314 1315 1316# The timer class was contributed by Itamar Shtull-Trauring 1317 1318class Timer(Thread): 1319 """Call a function after a specified number of seconds: 1320 1321 t = Timer(30.0, f, args=None, kwargs=None) 1322 t.start() 1323 t.cancel() # stop the timer's action if it's still waiting 1324 1325 """ 1326 1327 def __init__(self, interval, function, args=None, kwargs=None): 1328 Thread.__init__(self) 1329 self.interval = interval 1330 self.function = function 1331 self.args = args if args is not None else [] 1332 self.kwargs = kwargs if kwargs is not None else {} 1333 self.finished = Event() 1334 1335 def cancel(self): 1336 """Stop the timer if it hasn't finished yet.""" 1337 self.finished.set() 1338 1339 def run(self): 1340 self.finished.wait(self.interval) 1341 if not self.finished.is_set(): 1342 self.function(*self.args, **self.kwargs) 1343 self.finished.set() 1344 1345 1346# Special thread class to represent the main thread 1347 1348class _MainThread(Thread): 1349 1350 def __init__(self): 1351 Thread.__init__(self, name="MainThread", daemon=False) 1352 self._started.set() 1353 self._ident = _get_main_thread_ident() 1354 self._handle = _make_thread_handle(self._ident) 1355 if _HAVE_THREAD_NATIVE_ID: 1356 self._set_native_id() 1357 with _active_limbo_lock: 1358 _active[self._ident] = self 1359 1360 1361# Helper thread-local instance to detect when a _DummyThread 1362# is collected. Not a part of the public API. 1363_thread_local_info = local() 1364 1365 1366class _DeleteDummyThreadOnDel: 1367 ''' 1368 Helper class to remove a dummy thread from threading._active on __del__. 1369 ''' 1370 1371 def __init__(self, dummy_thread): 1372 self._dummy_thread = dummy_thread 1373 self._tident = dummy_thread.ident 1374 # Put the thread on a thread local variable so that when 1375 # the related thread finishes this instance is collected. 1376 # 1377 # Note: no other references to this instance may be created. 1378 # If any client code creates a reference to this instance, 1379 # the related _DummyThread will be kept forever! 1380 _thread_local_info._track_dummy_thread_ref = self 1381 1382 def __del__(self): 1383 with _active_limbo_lock: 1384 if _active.get(self._tident) is self._dummy_thread: 1385 _active.pop(self._tident, None) 1386 1387 1388# Dummy thread class to represent threads not started here. 1389# These should be added to `_active` and removed automatically 1390# when they die, although they can't be waited for. 1391# Their purpose is to return *something* from current_thread(). 1392# They are marked as daemon threads so we won't wait for them 1393# when we exit (conform previous semantics). 1394 1395class _DummyThread(Thread): 1396 1397 def __init__(self): 1398 Thread.__init__(self, name=_newname("Dummy-%d"), 1399 daemon=_daemon_threads_allowed()) 1400 self._started.set() 1401 self._set_ident() 1402 self._handle = _make_thread_handle(self._ident) 1403 if _HAVE_THREAD_NATIVE_ID: 1404 self._set_native_id() 1405 with _active_limbo_lock: 1406 _active[self._ident] = self 1407 _DeleteDummyThreadOnDel(self) 1408 1409 def is_alive(self): 1410 if not self._handle.is_done() and self._started.is_set(): 1411 return True 1412 raise RuntimeError("thread is not alive") 1413 1414 def join(self, timeout=None): 1415 raise RuntimeError("cannot join a dummy thread") 1416 1417 def _after_fork(self, new_ident=None): 1418 if new_ident is not None: 1419 self.__class__ = _MainThread 1420 self._name = 'MainThread' 1421 self._daemonic = False 1422 Thread._after_fork(self, new_ident=new_ident) 1423 1424 1425# Global API functions 1426 1427def current_thread(): 1428 """Return the current Thread object, corresponding to the caller's thread of control. 1429 1430 If the caller's thread of control was not created through the threading 1431 module, a dummy thread object with limited functionality is returned. 1432 1433 """ 1434 try: 1435 return _active[get_ident()] 1436 except KeyError: 1437 return _DummyThread() 1438 1439def currentThread(): 1440 """Return the current Thread object, corresponding to the caller's thread of control. 1441 1442 This function is deprecated, use current_thread() instead. 1443 1444 """ 1445 import warnings 1446 warnings.warn('currentThread() is deprecated, use current_thread() instead', 1447 DeprecationWarning, stacklevel=2) 1448 return current_thread() 1449 1450def active_count(): 1451 """Return the number of Thread objects currently alive. 1452 1453 The returned count is equal to the length of the list returned by 1454 enumerate(). 1455 1456 """ 1457 # NOTE: if the logic in here ever changes, update Modules/posixmodule.c 1458 # warn_about_fork_with_threads() to match. 1459 with _active_limbo_lock: 1460 return len(_active) + len(_limbo) 1461 1462def activeCount(): 1463 """Return the number of Thread objects currently alive. 1464 1465 This function is deprecated, use active_count() instead. 1466 1467 """ 1468 import warnings 1469 warnings.warn('activeCount() is deprecated, use active_count() instead', 1470 DeprecationWarning, stacklevel=2) 1471 return active_count() 1472 1473def _enumerate(): 1474 # Same as enumerate(), but without the lock. Internal use only. 1475 return list(_active.values()) + list(_limbo.values()) 1476 1477def enumerate(): 1478 """Return a list of all Thread objects currently alive. 1479 1480 The list includes daemonic threads, dummy thread objects created by 1481 current_thread(), and the main thread. It excludes terminated threads and 1482 threads that have not yet been started. 1483 1484 """ 1485 with _active_limbo_lock: 1486 return list(_active.values()) + list(_limbo.values()) 1487 1488 1489_threading_atexits = [] 1490_SHUTTING_DOWN = False 1491 1492def _register_atexit(func, *arg, **kwargs): 1493 """CPython internal: register *func* to be called before joining threads. 1494 1495 The registered *func* is called with its arguments just before all 1496 non-daemon threads are joined in `_shutdown()`. It provides a similar 1497 purpose to `atexit.register()`, but its functions are called prior to 1498 threading shutdown instead of interpreter shutdown. 1499 1500 For similarity to atexit, the registered functions are called in reverse. 1501 """ 1502 if _SHUTTING_DOWN: 1503 raise RuntimeError("can't register atexit after shutdown") 1504 1505 _threading_atexits.append(lambda: func(*arg, **kwargs)) 1506 1507 1508from _thread import stack_size 1509 1510# Create the main thread object, 1511# and make it available for the interpreter 1512# (Py_Main) as threading._shutdown. 1513 1514_main_thread = _MainThread() 1515 1516def _shutdown(): 1517 """ 1518 Wait until the Python thread state of all non-daemon threads get deleted. 1519 """ 1520 # Obscure: other threads may be waiting to join _main_thread. That's 1521 # dubious, but some code does it. We can't wait for it to be marked as done 1522 # normally - that won't happen until the interpreter is nearly dead. So 1523 # mark it done here. 1524 if _main_thread._handle.is_done() and _is_main_interpreter(): 1525 # _shutdown() was already called 1526 return 1527 1528 global _SHUTTING_DOWN 1529 _SHUTTING_DOWN = True 1530 1531 # Call registered threading atexit functions before threads are joined. 1532 # Order is reversed, similar to atexit. 1533 for atexit_call in reversed(_threading_atexits): 1534 atexit_call() 1535 1536 if _is_main_interpreter(): 1537 _main_thread._handle._set_done() 1538 1539 # Wait for all non-daemon threads to exit. 1540 _thread_shutdown() 1541 1542 1543def main_thread(): 1544 """Return the main thread object. 1545 1546 In normal conditions, the main thread is the thread from which the 1547 Python interpreter was started. 1548 """ 1549 # XXX Figure this out for subinterpreters. (See gh-75698.) 1550 return _main_thread 1551 1552 1553def _after_fork(): 1554 """ 1555 Cleanup threading module state that should not exist after a fork. 1556 """ 1557 # Reset _active_limbo_lock, in case we forked while the lock was held 1558 # by another (non-forked) thread. http://bugs.python.org/issue874900 1559 global _active_limbo_lock, _main_thread 1560 _active_limbo_lock = RLock() 1561 1562 # fork() only copied the current thread; clear references to others. 1563 new_active = {} 1564 1565 try: 1566 current = _active[get_ident()] 1567 except KeyError: 1568 # fork() was called in a thread which was not spawned 1569 # by threading.Thread. For example, a thread spawned 1570 # by thread.start_new_thread(). 1571 current = _MainThread() 1572 1573 _main_thread = current 1574 1575 with _active_limbo_lock: 1576 # Dangling thread instances must still have their locks reset, 1577 # because someone may join() them. 1578 threads = set(_enumerate()) 1579 threads.update(_dangling) 1580 for thread in threads: 1581 # Any lock/condition variable may be currently locked or in an 1582 # invalid state, so we reinitialize them. 1583 if thread is current: 1584 # This is the one and only active thread. 1585 ident = get_ident() 1586 thread._after_fork(new_ident=ident) 1587 new_active[ident] = thread 1588 else: 1589 # All the others are already stopped. 1590 thread._after_fork() 1591 1592 _limbo.clear() 1593 _active.clear() 1594 _active.update(new_active) 1595 assert len(_active) == 1 1596 1597 1598if hasattr(_os, "register_at_fork"): 1599 _os.register_at_fork(after_in_child=_after_fork) 1600