1"""Utilities for with-statement contexts. See PEP 343.""" 2import abc 3import os 4import sys 5import _collections_abc 6from collections import deque 7from functools import wraps 8from types import MethodType, GenericAlias 9 10__all__ = ["asynccontextmanager", "contextmanager", "closing", "nullcontext", 11 "AbstractContextManager", "AbstractAsyncContextManager", 12 "AsyncExitStack", "ContextDecorator", "ExitStack", 13 "redirect_stdout", "redirect_stderr", "suppress", "aclosing", 14 "chdir"] 15 16 17class AbstractContextManager(abc.ABC): 18 19 """An abstract base class for context managers.""" 20 21 __class_getitem__ = classmethod(GenericAlias) 22 23 __slots__ = () 24 25 def __enter__(self): 26 """Return `self` upon entering the runtime context.""" 27 return self 28 29 @abc.abstractmethod 30 def __exit__(self, exc_type, exc_value, traceback): 31 """Raise any exception triggered within the runtime context.""" 32 return None 33 34 @classmethod 35 def __subclasshook__(cls, C): 36 if cls is AbstractContextManager: 37 return _collections_abc._check_methods(C, "__enter__", "__exit__") 38 return NotImplemented 39 40 41class AbstractAsyncContextManager(abc.ABC): 42 43 """An abstract base class for asynchronous context managers.""" 44 45 __class_getitem__ = classmethod(GenericAlias) 46 47 __slots__ = () 48 49 async def __aenter__(self): 50 """Return `self` upon entering the runtime context.""" 51 return self 52 53 @abc.abstractmethod 54 async def __aexit__(self, exc_type, exc_value, traceback): 55 """Raise any exception triggered within the runtime context.""" 56 return None 57 58 @classmethod 59 def __subclasshook__(cls, C): 60 if cls is AbstractAsyncContextManager: 61 return _collections_abc._check_methods(C, "__aenter__", 62 "__aexit__") 63 return NotImplemented 64 65 66class ContextDecorator(object): 67 "A base class or mixin that enables context managers to work as decorators." 68 69 def _recreate_cm(self): 70 """Return a recreated instance of self. 71 72 Allows an otherwise one-shot context manager like 73 _GeneratorContextManager to support use as 74 a decorator via implicit recreation. 75 76 This is a private interface just for _GeneratorContextManager. 77 See issue #11647 for details. 78 """ 79 return self 80 81 def __call__(self, func): 82 @wraps(func) 83 def inner(*args, **kwds): 84 with self._recreate_cm(): 85 return func(*args, **kwds) 86 return inner 87 88 89class AsyncContextDecorator(object): 90 "A base class or mixin that enables async context managers to work as decorators." 91 92 def _recreate_cm(self): 93 """Return a recreated instance of self. 94 """ 95 return self 96 97 def __call__(self, func): 98 @wraps(func) 99 async def inner(*args, **kwds): 100 async with self._recreate_cm(): 101 return await func(*args, **kwds) 102 return inner 103 104 105class _GeneratorContextManagerBase: 106 """Shared functionality for @contextmanager and @asynccontextmanager.""" 107 108 def __init__(self, func, args, kwds): 109 self.gen = func(*args, **kwds) 110 self.func, self.args, self.kwds = func, args, kwds 111 # Issue 19330: ensure context manager instances have good docstrings 112 doc = getattr(func, "__doc__", None) 113 if doc is None: 114 doc = type(self).__doc__ 115 self.__doc__ = doc 116 # Unfortunately, this still doesn't provide good help output when 117 # inspecting the created context manager instances, since pydoc 118 # currently bypasses the instance docstring and shows the docstring 119 # for the class instead. 120 # See http://bugs.python.org/issue19404 for more details. 121 122 def _recreate_cm(self): 123 # _GCMB instances are one-shot context managers, so the 124 # CM must be recreated each time a decorated function is 125 # called 126 return self.__class__(self.func, self.args, self.kwds) 127 128 129class _GeneratorContextManager( 130 _GeneratorContextManagerBase, 131 AbstractContextManager, 132 ContextDecorator, 133): 134 """Helper for @contextmanager decorator.""" 135 136 def __enter__(self): 137 # do not keep args and kwds alive unnecessarily 138 # they are only needed for recreation, which is not possible anymore 139 del self.args, self.kwds, self.func 140 try: 141 return next(self.gen) 142 except StopIteration: 143 raise RuntimeError("generator didn't yield") from None 144 145 def __exit__(self, typ, value, traceback): 146 if typ is None: 147 try: 148 next(self.gen) 149 except StopIteration: 150 return False 151 else: 152 try: 153 raise RuntimeError("generator didn't stop") 154 finally: 155 self.gen.close() 156 else: 157 if value is None: 158 # Need to force instantiation so we can reliably 159 # tell if we get the same exception back 160 value = typ() 161 try: 162 self.gen.throw(value) 163 except StopIteration as exc: 164 # Suppress StopIteration *unless* it's the same exception that 165 # was passed to throw(). This prevents a StopIteration 166 # raised inside the "with" statement from being suppressed. 167 return exc is not value 168 except RuntimeError as exc: 169 # Don't re-raise the passed in exception. (issue27122) 170 if exc is value: 171 exc.__traceback__ = traceback 172 return False 173 # Avoid suppressing if a StopIteration exception 174 # was passed to throw() and later wrapped into a RuntimeError 175 # (see PEP 479 for sync generators; async generators also 176 # have this behavior). But do this only if the exception wrapped 177 # by the RuntimeError is actually Stop(Async)Iteration (see 178 # issue29692). 179 if ( 180 isinstance(value, StopIteration) 181 and exc.__cause__ is value 182 ): 183 value.__traceback__ = traceback 184 return False 185 raise 186 except BaseException as exc: 187 # only re-raise if it's *not* the exception that was 188 # passed to throw(), because __exit__() must not raise 189 # an exception unless __exit__() itself failed. But throw() 190 # has to raise the exception to signal propagation, so this 191 # fixes the impedance mismatch between the throw() protocol 192 # and the __exit__() protocol. 193 if exc is not value: 194 raise 195 exc.__traceback__ = traceback 196 return False 197 try: 198 raise RuntimeError("generator didn't stop after throw()") 199 finally: 200 self.gen.close() 201 202class _AsyncGeneratorContextManager( 203 _GeneratorContextManagerBase, 204 AbstractAsyncContextManager, 205 AsyncContextDecorator, 206): 207 """Helper for @asynccontextmanager decorator.""" 208 209 async def __aenter__(self): 210 # do not keep args and kwds alive unnecessarily 211 # they are only needed for recreation, which is not possible anymore 212 del self.args, self.kwds, self.func 213 try: 214 return await anext(self.gen) 215 except StopAsyncIteration: 216 raise RuntimeError("generator didn't yield") from None 217 218 async def __aexit__(self, typ, value, traceback): 219 if typ is None: 220 try: 221 await anext(self.gen) 222 except StopAsyncIteration: 223 return False 224 else: 225 try: 226 raise RuntimeError("generator didn't stop") 227 finally: 228 await self.gen.aclose() 229 else: 230 if value is None: 231 # Need to force instantiation so we can reliably 232 # tell if we get the same exception back 233 value = typ() 234 try: 235 await self.gen.athrow(value) 236 except StopAsyncIteration as exc: 237 # Suppress StopIteration *unless* it's the same exception that 238 # was passed to throw(). This prevents a StopIteration 239 # raised inside the "with" statement from being suppressed. 240 return exc is not value 241 except RuntimeError as exc: 242 # Don't re-raise the passed in exception. (issue27122) 243 if exc is value: 244 exc.__traceback__ = traceback 245 return False 246 # Avoid suppressing if a Stop(Async)Iteration exception 247 # was passed to athrow() and later wrapped into a RuntimeError 248 # (see PEP 479 for sync generators; async generators also 249 # have this behavior). But do this only if the exception wrapped 250 # by the RuntimeError is actually Stop(Async)Iteration (see 251 # issue29692). 252 if ( 253 isinstance(value, (StopIteration, StopAsyncIteration)) 254 and exc.__cause__ is value 255 ): 256 value.__traceback__ = traceback 257 return False 258 raise 259 except BaseException as exc: 260 # only re-raise if it's *not* the exception that was 261 # passed to throw(), because __exit__() must not raise 262 # an exception unless __exit__() itself failed. But throw() 263 # has to raise the exception to signal propagation, so this 264 # fixes the impedance mismatch between the throw() protocol 265 # and the __exit__() protocol. 266 if exc is not value: 267 raise 268 exc.__traceback__ = traceback 269 return False 270 try: 271 raise RuntimeError("generator didn't stop after athrow()") 272 finally: 273 await self.gen.aclose() 274 275 276def contextmanager(func): 277 """@contextmanager decorator. 278 279 Typical usage: 280 281 @contextmanager 282 def some_generator(<arguments>): 283 <setup> 284 try: 285 yield <value> 286 finally: 287 <cleanup> 288 289 This makes this: 290 291 with some_generator(<arguments>) as <variable>: 292 <body> 293 294 equivalent to this: 295 296 <setup> 297 try: 298 <variable> = <value> 299 <body> 300 finally: 301 <cleanup> 302 """ 303 @wraps(func) 304 def helper(*args, **kwds): 305 return _GeneratorContextManager(func, args, kwds) 306 return helper 307 308 309def asynccontextmanager(func): 310 """@asynccontextmanager decorator. 311 312 Typical usage: 313 314 @asynccontextmanager 315 async def some_async_generator(<arguments>): 316 <setup> 317 try: 318 yield <value> 319 finally: 320 <cleanup> 321 322 This makes this: 323 324 async with some_async_generator(<arguments>) as <variable>: 325 <body> 326 327 equivalent to this: 328 329 <setup> 330 try: 331 <variable> = <value> 332 <body> 333 finally: 334 <cleanup> 335 """ 336 @wraps(func) 337 def helper(*args, **kwds): 338 return _AsyncGeneratorContextManager(func, args, kwds) 339 return helper 340 341 342class closing(AbstractContextManager): 343 """Context to automatically close something at the end of a block. 344 345 Code like this: 346 347 with closing(<module>.open(<arguments>)) as f: 348 <block> 349 350 is equivalent to this: 351 352 f = <module>.open(<arguments>) 353 try: 354 <block> 355 finally: 356 f.close() 357 358 """ 359 def __init__(self, thing): 360 self.thing = thing 361 def __enter__(self): 362 return self.thing 363 def __exit__(self, *exc_info): 364 self.thing.close() 365 366 367class aclosing(AbstractAsyncContextManager): 368 """Async context manager for safely finalizing an asynchronously cleaned-up 369 resource such as an async generator, calling its ``aclose()`` method. 370 371 Code like this: 372 373 async with aclosing(<module>.fetch(<arguments>)) as agen: 374 <block> 375 376 is equivalent to this: 377 378 agen = <module>.fetch(<arguments>) 379 try: 380 <block> 381 finally: 382 await agen.aclose() 383 384 """ 385 def __init__(self, thing): 386 self.thing = thing 387 async def __aenter__(self): 388 return self.thing 389 async def __aexit__(self, *exc_info): 390 await self.thing.aclose() 391 392 393class _RedirectStream(AbstractContextManager): 394 395 _stream = None 396 397 def __init__(self, new_target): 398 self._new_target = new_target 399 # We use a list of old targets to make this CM re-entrant 400 self._old_targets = [] 401 402 def __enter__(self): 403 self._old_targets.append(getattr(sys, self._stream)) 404 setattr(sys, self._stream, self._new_target) 405 return self._new_target 406 407 def __exit__(self, exctype, excinst, exctb): 408 setattr(sys, self._stream, self._old_targets.pop()) 409 410 411class redirect_stdout(_RedirectStream): 412 """Context manager for temporarily redirecting stdout to another file. 413 414 # How to send help() to stderr 415 with redirect_stdout(sys.stderr): 416 help(dir) 417 418 # How to write help() to a file 419 with open('help.txt', 'w') as f: 420 with redirect_stdout(f): 421 help(pow) 422 """ 423 424 _stream = "stdout" 425 426 427class redirect_stderr(_RedirectStream): 428 """Context manager for temporarily redirecting stderr to another file.""" 429 430 _stream = "stderr" 431 432 433class suppress(AbstractContextManager): 434 """Context manager to suppress specified exceptions 435 436 After the exception is suppressed, execution proceeds with the next 437 statement following the with statement. 438 439 with suppress(FileNotFoundError): 440 os.remove(somefile) 441 # Execution still resumes here if the file was already removed 442 """ 443 444 def __init__(self, *exceptions): 445 self._exceptions = exceptions 446 447 def __enter__(self): 448 pass 449 450 def __exit__(self, exctype, excinst, exctb): 451 # Unlike isinstance and issubclass, CPython exception handling 452 # currently only looks at the concrete type hierarchy (ignoring 453 # the instance and subclass checking hooks). While Guido considers 454 # that a bug rather than a feature, it's a fairly hard one to fix 455 # due to various internal implementation details. suppress provides 456 # the simpler issubclass based semantics, rather than trying to 457 # exactly reproduce the limitations of the CPython interpreter. 458 # 459 # See http://bugs.python.org/issue12029 for more details 460 if exctype is None: 461 return 462 if issubclass(exctype, self._exceptions): 463 return True 464 if issubclass(exctype, BaseExceptionGroup): 465 match, rest = excinst.split(self._exceptions) 466 if rest is None: 467 return True 468 raise rest 469 return False 470 471 472class _BaseExitStack: 473 """A base class for ExitStack and AsyncExitStack.""" 474 475 @staticmethod 476 def _create_exit_wrapper(cm, cm_exit): 477 return MethodType(cm_exit, cm) 478 479 @staticmethod 480 def _create_cb_wrapper(callback, /, *args, **kwds): 481 def _exit_wrapper(exc_type, exc, tb): 482 callback(*args, **kwds) 483 return _exit_wrapper 484 485 def __init__(self): 486 self._exit_callbacks = deque() 487 488 def pop_all(self): 489 """Preserve the context stack by transferring it to a new instance.""" 490 new_stack = type(self)() 491 new_stack._exit_callbacks = self._exit_callbacks 492 self._exit_callbacks = deque() 493 return new_stack 494 495 def push(self, exit): 496 """Registers a callback with the standard __exit__ method signature. 497 498 Can suppress exceptions the same way __exit__ method can. 499 Also accepts any object with an __exit__ method (registering a call 500 to the method instead of the object itself). 501 """ 502 # We use an unbound method rather than a bound method to follow 503 # the standard lookup behaviour for special methods. 504 _cb_type = type(exit) 505 506 try: 507 exit_method = _cb_type.__exit__ 508 except AttributeError: 509 # Not a context manager, so assume it's a callable. 510 self._push_exit_callback(exit) 511 else: 512 self._push_cm_exit(exit, exit_method) 513 return exit # Allow use as a decorator. 514 515 def enter_context(self, cm): 516 """Enters the supplied context manager. 517 518 If successful, also pushes its __exit__ method as a callback and 519 returns the result of the __enter__ method. 520 """ 521 # We look up the special methods on the type to match the with 522 # statement. 523 cls = type(cm) 524 try: 525 _enter = cls.__enter__ 526 _exit = cls.__exit__ 527 except AttributeError: 528 raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " 529 f"not support the context manager protocol") from None 530 result = _enter(cm) 531 self._push_cm_exit(cm, _exit) 532 return result 533 534 def callback(self, callback, /, *args, **kwds): 535 """Registers an arbitrary callback and arguments. 536 537 Cannot suppress exceptions. 538 """ 539 _exit_wrapper = self._create_cb_wrapper(callback, *args, **kwds) 540 541 # We changed the signature, so using @wraps is not appropriate, but 542 # setting __wrapped__ may still help with introspection. 543 _exit_wrapper.__wrapped__ = callback 544 self._push_exit_callback(_exit_wrapper) 545 return callback # Allow use as a decorator 546 547 def _push_cm_exit(self, cm, cm_exit): 548 """Helper to correctly register callbacks to __exit__ methods.""" 549 _exit_wrapper = self._create_exit_wrapper(cm, cm_exit) 550 self._push_exit_callback(_exit_wrapper, True) 551 552 def _push_exit_callback(self, callback, is_sync=True): 553 self._exit_callbacks.append((is_sync, callback)) 554 555 556# Inspired by discussions on http://bugs.python.org/issue13585 557class ExitStack(_BaseExitStack, AbstractContextManager): 558 """Context manager for dynamic management of a stack of exit callbacks. 559 560 For example: 561 with ExitStack() as stack: 562 files = [stack.enter_context(open(fname)) for fname in filenames] 563 # All opened files will automatically be closed at the end of 564 # the with statement, even if attempts to open files later 565 # in the list raise an exception. 566 """ 567 568 def __enter__(self): 569 return self 570 571 def __exit__(self, *exc_details): 572 exc = exc_details[1] 573 received_exc = exc is not None 574 575 # We manipulate the exception state so it behaves as though 576 # we were actually nesting multiple with statements 577 frame_exc = sys.exception() 578 def _fix_exception_context(new_exc, old_exc): 579 # Context may not be correct, so find the end of the chain 580 while 1: 581 exc_context = new_exc.__context__ 582 if exc_context is None or exc_context is old_exc: 583 # Context is already set correctly (see issue 20317) 584 return 585 if exc_context is frame_exc: 586 break 587 new_exc = exc_context 588 # Change the end of the chain to point to the exception 589 # we expect it to reference 590 new_exc.__context__ = old_exc 591 592 # Callbacks are invoked in LIFO order to match the behaviour of 593 # nested context managers 594 suppressed_exc = False 595 pending_raise = False 596 while self._exit_callbacks: 597 is_sync, cb = self._exit_callbacks.pop() 598 assert is_sync 599 try: 600 if exc is None: 601 exc_details = None, None, None 602 else: 603 exc_details = type(exc), exc, exc.__traceback__ 604 if cb(*exc_details): 605 suppressed_exc = True 606 pending_raise = False 607 exc = None 608 except BaseException as new_exc: 609 # simulate the stack of exceptions by setting the context 610 _fix_exception_context(new_exc, exc) 611 pending_raise = True 612 exc = new_exc 613 614 if pending_raise: 615 try: 616 # bare "raise exc" replaces our carefully 617 # set-up context 618 fixed_ctx = exc.__context__ 619 raise exc 620 except BaseException: 621 exc.__context__ = fixed_ctx 622 raise 623 return received_exc and suppressed_exc 624 625 def close(self): 626 """Immediately unwind the context stack.""" 627 self.__exit__(None, None, None) 628 629 630# Inspired by discussions on https://bugs.python.org/issue29302 631class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager): 632 """Async context manager for dynamic management of a stack of exit 633 callbacks. 634 635 For example: 636 async with AsyncExitStack() as stack: 637 connections = [await stack.enter_async_context(get_connection()) 638 for i in range(5)] 639 # All opened connections will automatically be released at the 640 # end of the async with statement, even if attempts to open a 641 # connection later in the list raise an exception. 642 """ 643 644 @staticmethod 645 def _create_async_exit_wrapper(cm, cm_exit): 646 return MethodType(cm_exit, cm) 647 648 @staticmethod 649 def _create_async_cb_wrapper(callback, /, *args, **kwds): 650 async def _exit_wrapper(exc_type, exc, tb): 651 await callback(*args, **kwds) 652 return _exit_wrapper 653 654 async def enter_async_context(self, cm): 655 """Enters the supplied async context manager. 656 657 If successful, also pushes its __aexit__ method as a callback and 658 returns the result of the __aenter__ method. 659 """ 660 cls = type(cm) 661 try: 662 _enter = cls.__aenter__ 663 _exit = cls.__aexit__ 664 except AttributeError: 665 raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " 666 f"not support the asynchronous context manager protocol" 667 ) from None 668 result = await _enter(cm) 669 self._push_async_cm_exit(cm, _exit) 670 return result 671 672 def push_async_exit(self, exit): 673 """Registers a coroutine function with the standard __aexit__ method 674 signature. 675 676 Can suppress exceptions the same way __aexit__ method can. 677 Also accepts any object with an __aexit__ method (registering a call 678 to the method instead of the object itself). 679 """ 680 _cb_type = type(exit) 681 try: 682 exit_method = _cb_type.__aexit__ 683 except AttributeError: 684 # Not an async context manager, so assume it's a coroutine function 685 self._push_exit_callback(exit, False) 686 else: 687 self._push_async_cm_exit(exit, exit_method) 688 return exit # Allow use as a decorator 689 690 def push_async_callback(self, callback, /, *args, **kwds): 691 """Registers an arbitrary coroutine function and arguments. 692 693 Cannot suppress exceptions. 694 """ 695 _exit_wrapper = self._create_async_cb_wrapper(callback, *args, **kwds) 696 697 # We changed the signature, so using @wraps is not appropriate, but 698 # setting __wrapped__ may still help with introspection. 699 _exit_wrapper.__wrapped__ = callback 700 self._push_exit_callback(_exit_wrapper, False) 701 return callback # Allow use as a decorator 702 703 async def aclose(self): 704 """Immediately unwind the context stack.""" 705 await self.__aexit__(None, None, None) 706 707 def _push_async_cm_exit(self, cm, cm_exit): 708 """Helper to correctly register coroutine function to __aexit__ 709 method.""" 710 _exit_wrapper = self._create_async_exit_wrapper(cm, cm_exit) 711 self._push_exit_callback(_exit_wrapper, False) 712 713 async def __aenter__(self): 714 return self 715 716 async def __aexit__(self, *exc_details): 717 exc = exc_details[1] 718 received_exc = exc is not None 719 720 # We manipulate the exception state so it behaves as though 721 # we were actually nesting multiple with statements 722 frame_exc = sys.exception() 723 def _fix_exception_context(new_exc, old_exc): 724 # Context may not be correct, so find the end of the chain 725 while 1: 726 exc_context = new_exc.__context__ 727 if exc_context is None or exc_context is old_exc: 728 # Context is already set correctly (see issue 20317) 729 return 730 if exc_context is frame_exc: 731 break 732 new_exc = exc_context 733 # Change the end of the chain to point to the exception 734 # we expect it to reference 735 new_exc.__context__ = old_exc 736 737 # Callbacks are invoked in LIFO order to match the behaviour of 738 # nested context managers 739 suppressed_exc = False 740 pending_raise = False 741 while self._exit_callbacks: 742 is_sync, cb = self._exit_callbacks.pop() 743 try: 744 if exc is None: 745 exc_details = None, None, None 746 else: 747 exc_details = type(exc), exc, exc.__traceback__ 748 if is_sync: 749 cb_suppress = cb(*exc_details) 750 else: 751 cb_suppress = await cb(*exc_details) 752 753 if cb_suppress: 754 suppressed_exc = True 755 pending_raise = False 756 exc = None 757 except BaseException as new_exc: 758 # simulate the stack of exceptions by setting the context 759 _fix_exception_context(new_exc, exc) 760 pending_raise = True 761 exc = new_exc 762 763 if pending_raise: 764 try: 765 # bare "raise exc" replaces our carefully 766 # set-up context 767 fixed_ctx = exc.__context__ 768 raise exc 769 except BaseException: 770 exc.__context__ = fixed_ctx 771 raise 772 return received_exc and suppressed_exc 773 774 775class nullcontext(AbstractContextManager, AbstractAsyncContextManager): 776 """Context manager that does no additional processing. 777 778 Used as a stand-in for a normal context manager, when a particular 779 block of code is only sometimes used with a normal context manager: 780 781 cm = optional_cm if condition else nullcontext() 782 with cm: 783 # Perform operation, using optional_cm if condition is True 784 """ 785 786 def __init__(self, enter_result=None): 787 self.enter_result = enter_result 788 789 def __enter__(self): 790 return self.enter_result 791 792 def __exit__(self, *excinfo): 793 pass 794 795 async def __aenter__(self): 796 return self.enter_result 797 798 async def __aexit__(self, *excinfo): 799 pass 800 801 802class chdir(AbstractContextManager): 803 """Non thread-safe context manager to change the current working directory.""" 804 805 def __init__(self, path): 806 self.path = path 807 self._old_cwd = [] 808 809 def __enter__(self): 810 self._old_cwd.append(os.getcwd()) 811 os.chdir(self.path) 812 813 def __exit__(self, *excinfo): 814 os.chdir(self._old_cwd.pop()) 815