1# mock.py 2# Test tools for mocking and patching. 3# Maintained by Michael Foord 4# Backport for other versions of Python available from 5# https://pypi.org/project/mock 6 7__all__ = ( 8 'Mock', 9 'MagicMock', 10 'patch', 11 'sentinel', 12 'DEFAULT', 13 'ANY', 14 'call', 15 'create_autospec', 16 'AsyncMock', 17 'ThreadingMock', 18 'FILTER_DIR', 19 'NonCallableMock', 20 'NonCallableMagicMock', 21 'mock_open', 22 'PropertyMock', 23 'seal', 24) 25 26 27import asyncio 28import contextlib 29import io 30import inspect 31import pprint 32import sys 33import builtins 34import pkgutil 35from asyncio import iscoroutinefunction 36import threading 37from types import CodeType, ModuleType, MethodType 38from unittest.util import safe_repr 39from functools import wraps, partial 40from threading import RLock 41 42 43class InvalidSpecError(Exception): 44 """Indicates that an invalid value was used as a mock spec.""" 45 46 47_builtins = {name for name in dir(builtins) if not name.startswith('_')} 48 49FILTER_DIR = True 50 51# Workaround for issue #12370 52# Without this, the __class__ properties wouldn't be set correctly 53_safe_super = super 54 55def _is_async_obj(obj): 56 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 57 return False 58 if hasattr(obj, '__func__'): 59 obj = getattr(obj, '__func__') 60 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 61 62 63def _is_async_func(func): 64 if getattr(func, '__code__', None): 65 return iscoroutinefunction(func) 66 else: 67 return False 68 69 70def _is_instance_mock(obj): 71 # can't use isinstance on Mock objects because they override __class__ 72 # The base class for all mocks is NonCallableMock 73 return issubclass(type(obj), NonCallableMock) 74 75 76def _is_exception(obj): 77 return ( 78 isinstance(obj, BaseException) or 79 isinstance(obj, type) and issubclass(obj, BaseException) 80 ) 81 82 83def _extract_mock(obj): 84 # Autospecced functions will return a FunctionType with "mock" attribute 85 # which is the actual mock object that needs to be used. 86 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 87 return obj.mock 88 else: 89 return obj 90 91 92def _get_signature_object(func, as_instance, eat_self): 93 """ 94 Given an arbitrary, possibly callable object, try to create a suitable 95 signature object. 96 Return a (reduced func, signature) tuple, or None. 97 """ 98 if isinstance(func, type) and not as_instance: 99 # If it's a type and should be modelled as a type, use __init__. 100 func = func.__init__ 101 # Skip the `self` argument in __init__ 102 eat_self = True 103 elif isinstance(func, (classmethod, staticmethod)): 104 if isinstance(func, classmethod): 105 # Skip the `cls` argument of a class method 106 eat_self = True 107 # Use the original decorated method to extract the correct function signature 108 func = func.__func__ 109 elif not isinstance(func, FunctionTypes): 110 # If we really want to model an instance of the passed type, 111 # __call__ should be looked up, not __init__. 112 try: 113 func = func.__call__ 114 except AttributeError: 115 return None 116 if eat_self: 117 sig_func = partial(func, None) 118 else: 119 sig_func = func 120 try: 121 return func, inspect.signature(sig_func) 122 except ValueError: 123 # Certain callable types are not supported by inspect.signature() 124 return None 125 126 127def _check_signature(func, mock, skipfirst, instance=False): 128 sig = _get_signature_object(func, instance, skipfirst) 129 if sig is None: 130 return 131 func, sig = sig 132 def checksig(self, /, *args, **kwargs): 133 sig.bind(*args, **kwargs) 134 _copy_func_details(func, checksig) 135 type(mock)._mock_check_sig = checksig 136 type(mock).__signature__ = sig 137 138 139def _copy_func_details(func, funcopy): 140 # we explicitly don't copy func.__dict__ into this copy as it would 141 # expose original attributes that should be mocked 142 for attribute in ( 143 '__name__', '__doc__', '__text_signature__', 144 '__module__', '__defaults__', '__kwdefaults__', 145 ): 146 try: 147 setattr(funcopy, attribute, getattr(func, attribute)) 148 except AttributeError: 149 pass 150 151 152def _callable(obj): 153 if isinstance(obj, type): 154 return True 155 if isinstance(obj, (staticmethod, classmethod, MethodType)): 156 return _callable(obj.__func__) 157 if getattr(obj, '__call__', None) is not None: 158 return True 159 return False 160 161 162def _is_list(obj): 163 # checks for list or tuples 164 # XXXX badly named! 165 return type(obj) in (list, tuple) 166 167 168def _instance_callable(obj): 169 """Given an object, return True if the object is callable. 170 For classes, return True if instances would be callable.""" 171 if not isinstance(obj, type): 172 # already an instance 173 return getattr(obj, '__call__', None) is not None 174 175 # *could* be broken by a class overriding __mro__ or __dict__ via 176 # a metaclass 177 for base in (obj,) + obj.__mro__: 178 if base.__dict__.get('__call__') is not None: 179 return True 180 return False 181 182 183def _set_signature(mock, original, instance=False): 184 # creates a function with signature (*args, **kwargs) that delegates to a 185 # mock. It still does signature checking by calling a lambda with the same 186 # signature as the original. 187 188 skipfirst = isinstance(original, type) 189 result = _get_signature_object(original, instance, skipfirst) 190 if result is None: 191 return mock 192 func, sig = result 193 def checksig(*args, **kwargs): 194 sig.bind(*args, **kwargs) 195 _copy_func_details(func, checksig) 196 197 name = original.__name__ 198 if not name.isidentifier(): 199 name = 'funcopy' 200 context = {'_checksig_': checksig, 'mock': mock} 201 src = """def %s(*args, **kwargs): 202 _checksig_(*args, **kwargs) 203 return mock(*args, **kwargs)""" % name 204 exec (src, context) 205 funcopy = context[name] 206 _setup_func(funcopy, mock, sig) 207 return funcopy 208 209def _set_async_signature(mock, original, instance=False, is_async_mock=False): 210 # creates an async function with signature (*args, **kwargs) that delegates to a 211 # mock. It still does signature checking by calling a lambda with the same 212 # signature as the original. 213 214 skipfirst = isinstance(original, type) 215 func, sig = _get_signature_object(original, instance, skipfirst) 216 def checksig(*args, **kwargs): 217 sig.bind(*args, **kwargs) 218 _copy_func_details(func, checksig) 219 220 name = original.__name__ 221 context = {'_checksig_': checksig, 'mock': mock} 222 src = """async def %s(*args, **kwargs): 223 _checksig_(*args, **kwargs) 224 return await mock(*args, **kwargs)""" % name 225 exec (src, context) 226 funcopy = context[name] 227 _setup_func(funcopy, mock, sig) 228 _setup_async_mock(funcopy) 229 return funcopy 230 231 232def _setup_func(funcopy, mock, sig): 233 funcopy.mock = mock 234 235 def assert_called_with(*args, **kwargs): 236 return mock.assert_called_with(*args, **kwargs) 237 def assert_called(*args, **kwargs): 238 return mock.assert_called(*args, **kwargs) 239 def assert_not_called(*args, **kwargs): 240 return mock.assert_not_called(*args, **kwargs) 241 def assert_called_once(*args, **kwargs): 242 return mock.assert_called_once(*args, **kwargs) 243 def assert_called_once_with(*args, **kwargs): 244 return mock.assert_called_once_with(*args, **kwargs) 245 def assert_has_calls(*args, **kwargs): 246 return mock.assert_has_calls(*args, **kwargs) 247 def assert_any_call(*args, **kwargs): 248 return mock.assert_any_call(*args, **kwargs) 249 def reset_mock(): 250 funcopy.method_calls = _CallList() 251 funcopy.mock_calls = _CallList() 252 mock.reset_mock() 253 ret = funcopy.return_value 254 if _is_instance_mock(ret) and not ret is mock: 255 ret.reset_mock() 256 257 funcopy.called = False 258 funcopy.call_count = 0 259 funcopy.call_args = None 260 funcopy.call_args_list = _CallList() 261 funcopy.method_calls = _CallList() 262 funcopy.mock_calls = _CallList() 263 264 funcopy.return_value = mock.return_value 265 funcopy.side_effect = mock.side_effect 266 funcopy._mock_children = mock._mock_children 267 268 funcopy.assert_called_with = assert_called_with 269 funcopy.assert_called_once_with = assert_called_once_with 270 funcopy.assert_has_calls = assert_has_calls 271 funcopy.assert_any_call = assert_any_call 272 funcopy.reset_mock = reset_mock 273 funcopy.assert_called = assert_called 274 funcopy.assert_not_called = assert_not_called 275 funcopy.assert_called_once = assert_called_once 276 funcopy.__signature__ = sig 277 278 mock._mock_delegate = funcopy 279 280 281def _setup_async_mock(mock): 282 mock._is_coroutine = asyncio.coroutines._is_coroutine 283 mock.await_count = 0 284 mock.await_args = None 285 mock.await_args_list = _CallList() 286 287 # Mock is not configured yet so the attributes are set 288 # to a function and then the corresponding mock helper function 289 # is called when the helper is accessed similar to _setup_func. 290 def wrapper(attr, /, *args, **kwargs): 291 return getattr(mock.mock, attr)(*args, **kwargs) 292 293 for attribute in ('assert_awaited', 294 'assert_awaited_once', 295 'assert_awaited_with', 296 'assert_awaited_once_with', 297 'assert_any_await', 298 'assert_has_awaits', 299 'assert_not_awaited'): 300 301 # setattr(mock, attribute, wrapper) causes late binding 302 # hence attribute will always be the last value in the loop 303 # Use partial(wrapper, attribute) to ensure the attribute is bound 304 # correctly. 305 setattr(mock, attribute, partial(wrapper, attribute)) 306 307 308def _is_magic(name): 309 return '__%s__' % name[2:-2] == name 310 311 312class _SentinelObject(object): 313 "A unique, named, sentinel object." 314 def __init__(self, name): 315 self.name = name 316 317 def __repr__(self): 318 return 'sentinel.%s' % self.name 319 320 def __reduce__(self): 321 return 'sentinel.%s' % self.name 322 323 324class _Sentinel(object): 325 """Access attributes to return a named object, usable as a sentinel.""" 326 def __init__(self): 327 self._sentinels = {} 328 329 def __getattr__(self, name): 330 if name == '__bases__': 331 # Without this help(unittest.mock) raises an exception 332 raise AttributeError 333 return self._sentinels.setdefault(name, _SentinelObject(name)) 334 335 def __reduce__(self): 336 return 'sentinel' 337 338 339sentinel = _Sentinel() 340 341DEFAULT = sentinel.DEFAULT 342_missing = sentinel.MISSING 343_deleted = sentinel.DELETED 344 345 346_allowed_names = { 347 'return_value', '_mock_return_value', 'side_effect', 348 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 349 '_mock_name', '_mock_new_name' 350} 351 352 353def _delegating_property(name): 354 _allowed_names.add(name) 355 _the_name = '_mock_' + name 356 def _get(self, name=name, _the_name=_the_name): 357 sig = self._mock_delegate 358 if sig is None: 359 return getattr(self, _the_name) 360 return getattr(sig, name) 361 def _set(self, value, name=name, _the_name=_the_name): 362 sig = self._mock_delegate 363 if sig is None: 364 self.__dict__[_the_name] = value 365 else: 366 setattr(sig, name, value) 367 368 return property(_get, _set) 369 370 371 372class _CallList(list): 373 374 def __contains__(self, value): 375 if not isinstance(value, list): 376 return list.__contains__(self, value) 377 len_value = len(value) 378 len_self = len(self) 379 if len_value > len_self: 380 return False 381 382 for i in range(0, len_self - len_value + 1): 383 sub_list = self[i:i+len_value] 384 if sub_list == value: 385 return True 386 return False 387 388 def __repr__(self): 389 return pprint.pformat(list(self)) 390 391 392def _check_and_set_parent(parent, value, name, new_name): 393 value = _extract_mock(value) 394 395 if not _is_instance_mock(value): 396 return False 397 if ((value._mock_name or value._mock_new_name) or 398 (value._mock_parent is not None) or 399 (value._mock_new_parent is not None)): 400 return False 401 402 _parent = parent 403 while _parent is not None: 404 # setting a mock (value) as a child or return value of itself 405 # should not modify the mock 406 if _parent is value: 407 return False 408 _parent = _parent._mock_new_parent 409 410 if new_name: 411 value._mock_new_parent = parent 412 value._mock_new_name = new_name 413 if name: 414 value._mock_parent = parent 415 value._mock_name = name 416 return True 417 418# Internal class to identify if we wrapped an iterator object or not. 419class _MockIter(object): 420 def __init__(self, obj): 421 self.obj = iter(obj) 422 def __next__(self): 423 return next(self.obj) 424 425class Base(object): 426 _mock_return_value = DEFAULT 427 _mock_side_effect = None 428 def __init__(self, /, *args, **kwargs): 429 pass 430 431 432 433class NonCallableMock(Base): 434 """A non-callable version of `Mock`""" 435 436 # Store a mutex as a class attribute in order to protect concurrent access 437 # to mock attributes. Using a class attribute allows all NonCallableMock 438 # instances to share the mutex for simplicity. 439 # 440 # See https://github.com/python/cpython/issues/98624 for why this is 441 # necessary. 442 _lock = RLock() 443 444 def __new__( 445 cls, spec=None, wraps=None, name=None, spec_set=None, 446 parent=None, _spec_state=None, _new_name='', _new_parent=None, 447 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 448 ): 449 # every instance has its own class 450 # so we can create magic methods on the 451 # class without stomping on other mocks 452 bases = (cls,) 453 if not issubclass(cls, AsyncMockMixin): 454 # Check if spec is an async object or function 455 spec_arg = spec_set or spec 456 if spec_arg is not None and _is_async_obj(spec_arg): 457 bases = (AsyncMockMixin, cls) 458 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 459 instance = _safe_super(NonCallableMock, cls).__new__(new) 460 return instance 461 462 463 def __init__( 464 self, spec=None, wraps=None, name=None, spec_set=None, 465 parent=None, _spec_state=None, _new_name='', _new_parent=None, 466 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 467 ): 468 if _new_parent is None: 469 _new_parent = parent 470 471 __dict__ = self.__dict__ 472 __dict__['_mock_parent'] = parent 473 __dict__['_mock_name'] = name 474 __dict__['_mock_new_name'] = _new_name 475 __dict__['_mock_new_parent'] = _new_parent 476 __dict__['_mock_sealed'] = False 477 478 if spec_set is not None: 479 spec = spec_set 480 spec_set = True 481 if _eat_self is None: 482 _eat_self = parent is not None 483 484 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 485 486 __dict__['_mock_children'] = {} 487 __dict__['_mock_wraps'] = wraps 488 __dict__['_mock_delegate'] = None 489 490 __dict__['_mock_called'] = False 491 __dict__['_mock_call_args'] = None 492 __dict__['_mock_call_count'] = 0 493 __dict__['_mock_call_args_list'] = _CallList() 494 __dict__['_mock_mock_calls'] = _CallList() 495 496 __dict__['method_calls'] = _CallList() 497 __dict__['_mock_unsafe'] = unsafe 498 499 if kwargs: 500 self.configure_mock(**kwargs) 501 502 _safe_super(NonCallableMock, self).__init__( 503 spec, wraps, name, spec_set, parent, 504 _spec_state 505 ) 506 507 508 def attach_mock(self, mock, attribute): 509 """ 510 Attach a mock as an attribute of this one, replacing its name and 511 parent. Calls to the attached mock will be recorded in the 512 `method_calls` and `mock_calls` attributes of this one.""" 513 inner_mock = _extract_mock(mock) 514 515 inner_mock._mock_parent = None 516 inner_mock._mock_new_parent = None 517 inner_mock._mock_name = '' 518 inner_mock._mock_new_name = None 519 520 setattr(self, attribute, mock) 521 522 523 def mock_add_spec(self, spec, spec_set=False): 524 """Add a spec to a mock. `spec` can either be an object or a 525 list of strings. Only attributes on the `spec` can be fetched as 526 attributes from the mock. 527 528 If `spec_set` is True then only attributes on the spec can be set.""" 529 self._mock_add_spec(spec, spec_set) 530 531 532 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 533 _eat_self=False): 534 if _is_instance_mock(spec): 535 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]') 536 537 _spec_class = None 538 _spec_signature = None 539 _spec_asyncs = [] 540 541 if spec is not None and not _is_list(spec): 542 if isinstance(spec, type): 543 _spec_class = spec 544 else: 545 _spec_class = type(spec) 546 res = _get_signature_object(spec, 547 _spec_as_instance, _eat_self) 548 _spec_signature = res and res[1] 549 550 spec_list = dir(spec) 551 552 for attr in spec_list: 553 static_attr = inspect.getattr_static(spec, attr, None) 554 unwrapped_attr = static_attr 555 try: 556 unwrapped_attr = inspect.unwrap(unwrapped_attr) 557 except ValueError: 558 pass 559 if iscoroutinefunction(unwrapped_attr): 560 _spec_asyncs.append(attr) 561 562 spec = spec_list 563 564 __dict__ = self.__dict__ 565 __dict__['_spec_class'] = _spec_class 566 __dict__['_spec_set'] = spec_set 567 __dict__['_spec_signature'] = _spec_signature 568 __dict__['_mock_methods'] = spec 569 __dict__['_spec_asyncs'] = _spec_asyncs 570 571 def __get_return_value(self): 572 ret = self._mock_return_value 573 if self._mock_delegate is not None: 574 ret = self._mock_delegate.return_value 575 576 if ret is DEFAULT and self._mock_wraps is None: 577 ret = self._get_child_mock( 578 _new_parent=self, _new_name='()' 579 ) 580 self.return_value = ret 581 return ret 582 583 584 def __set_return_value(self, value): 585 if self._mock_delegate is not None: 586 self._mock_delegate.return_value = value 587 else: 588 self._mock_return_value = value 589 _check_and_set_parent(self, value, None, '()') 590 591 __return_value_doc = "The value to be returned when the mock is called." 592 return_value = property(__get_return_value, __set_return_value, 593 __return_value_doc) 594 595 596 @property 597 def __class__(self): 598 if self._spec_class is None: 599 return type(self) 600 return self._spec_class 601 602 called = _delegating_property('called') 603 call_count = _delegating_property('call_count') 604 call_args = _delegating_property('call_args') 605 call_args_list = _delegating_property('call_args_list') 606 mock_calls = _delegating_property('mock_calls') 607 608 609 def __get_side_effect(self): 610 delegated = self._mock_delegate 611 if delegated is None: 612 return self._mock_side_effect 613 sf = delegated.side_effect 614 if (sf is not None and not callable(sf) 615 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 616 sf = _MockIter(sf) 617 delegated.side_effect = sf 618 return sf 619 620 def __set_side_effect(self, value): 621 value = _try_iter(value) 622 delegated = self._mock_delegate 623 if delegated is None: 624 self._mock_side_effect = value 625 else: 626 delegated.side_effect = value 627 628 side_effect = property(__get_side_effect, __set_side_effect) 629 630 631 def reset_mock(self, visited=None, *, return_value=False, side_effect=False): 632 "Restore the mock object to its initial state." 633 if visited is None: 634 visited = [] 635 if id(self) in visited: 636 return 637 visited.append(id(self)) 638 639 self.called = False 640 self.call_args = None 641 self.call_count = 0 642 self.mock_calls = _CallList() 643 self.call_args_list = _CallList() 644 self.method_calls = _CallList() 645 646 if return_value: 647 self._mock_return_value = DEFAULT 648 if side_effect: 649 self._mock_side_effect = None 650 651 for child in self._mock_children.values(): 652 if isinstance(child, _SpecState) or child is _deleted: 653 continue 654 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 655 656 ret = self._mock_return_value 657 if _is_instance_mock(ret) and ret is not self: 658 ret.reset_mock(visited) 659 660 661 def configure_mock(self, /, **kwargs): 662 """Set attributes on the mock through keyword arguments. 663 664 Attributes plus return values and side effects can be set on child 665 mocks using standard dot notation and unpacking a dictionary in the 666 method call: 667 668 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 669 >>> mock.configure_mock(**attrs)""" 670 for arg, val in sorted(kwargs.items(), 671 # we sort on the number of dots so that 672 # attributes are set before we set attributes on 673 # attributes 674 key=lambda entry: entry[0].count('.')): 675 args = arg.split('.') 676 final = args.pop() 677 obj = self 678 for entry in args: 679 obj = getattr(obj, entry) 680 setattr(obj, final, val) 681 682 683 def __getattr__(self, name): 684 if name in {'_mock_methods', '_mock_unsafe'}: 685 raise AttributeError(name) 686 elif self._mock_methods is not None: 687 if name not in self._mock_methods or name in _all_magics: 688 raise AttributeError("Mock object has no attribute %r" % name) 689 elif _is_magic(name): 690 raise AttributeError(name) 691 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods): 692 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST: 693 raise AttributeError( 694 f"{name!r} is not a valid assertion. Use a spec " 695 f"for the mock if {name!r} is meant to be an attribute.") 696 697 with NonCallableMock._lock: 698 result = self._mock_children.get(name) 699 if result is _deleted: 700 raise AttributeError(name) 701 elif result is None: 702 wraps = None 703 if self._mock_wraps is not None: 704 # XXXX should we get the attribute without triggering code 705 # execution? 706 wraps = getattr(self._mock_wraps, name) 707 708 result = self._get_child_mock( 709 parent=self, name=name, wraps=wraps, _new_name=name, 710 _new_parent=self 711 ) 712 self._mock_children[name] = result 713 714 elif isinstance(result, _SpecState): 715 try: 716 result = create_autospec( 717 result.spec, result.spec_set, result.instance, 718 result.parent, result.name 719 ) 720 except InvalidSpecError: 721 target_name = self.__dict__['_mock_name'] or self 722 raise InvalidSpecError( 723 f'Cannot autospec attr {name!r} from target ' 724 f'{target_name!r} as it has already been mocked out. ' 725 f'[target={self!r}, attr={result.spec!r}]') 726 self._mock_children[name] = result 727 728 return result 729 730 731 def _extract_mock_name(self): 732 _name_list = [self._mock_new_name] 733 _parent = self._mock_new_parent 734 last = self 735 736 dot = '.' 737 if _name_list == ['()']: 738 dot = '' 739 740 while _parent is not None: 741 last = _parent 742 743 _name_list.append(_parent._mock_new_name + dot) 744 dot = '.' 745 if _parent._mock_new_name == '()': 746 dot = '' 747 748 _parent = _parent._mock_new_parent 749 750 _name_list = list(reversed(_name_list)) 751 _first = last._mock_name or 'mock' 752 if len(_name_list) > 1: 753 if _name_list[1] not in ('()', '().'): 754 _first += '.' 755 _name_list[0] = _first 756 return ''.join(_name_list) 757 758 def __repr__(self): 759 name = self._extract_mock_name() 760 761 name_string = '' 762 if name not in ('mock', 'mock.'): 763 name_string = ' name=%r' % name 764 765 spec_string = '' 766 if self._spec_class is not None: 767 spec_string = ' spec=%r' 768 if self._spec_set: 769 spec_string = ' spec_set=%r' 770 spec_string = spec_string % self._spec_class.__name__ 771 return "<%s%s%s id='%s'>" % ( 772 type(self).__name__, 773 name_string, 774 spec_string, 775 id(self) 776 ) 777 778 779 def __dir__(self): 780 """Filter the output of `dir(mock)` to only useful members.""" 781 if not FILTER_DIR: 782 return object.__dir__(self) 783 784 extras = self._mock_methods or [] 785 from_type = dir(type(self)) 786 from_dict = list(self.__dict__) 787 from_child_mocks = [ 788 m_name for m_name, m_value in self._mock_children.items() 789 if m_value is not _deleted] 790 791 from_type = [e for e in from_type if not e.startswith('_')] 792 from_dict = [e for e in from_dict if not e.startswith('_') or 793 _is_magic(e)] 794 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 795 796 797 def __setattr__(self, name, value): 798 if name in _allowed_names: 799 # property setters go through here 800 return object.__setattr__(self, name, value) 801 elif (self._spec_set and self._mock_methods is not None and 802 name not in self._mock_methods and 803 name not in self.__dict__): 804 raise AttributeError("Mock object has no attribute '%s'" % name) 805 elif name in _unsupported_magics: 806 msg = 'Attempting to set unsupported magic method %r.' % name 807 raise AttributeError(msg) 808 elif name in _all_magics: 809 if self._mock_methods is not None and name not in self._mock_methods: 810 raise AttributeError("Mock object has no attribute '%s'" % name) 811 812 if not _is_instance_mock(value): 813 setattr(type(self), name, _get_method(name, value)) 814 original = value 815 value = lambda *args, **kw: original(self, *args, **kw) 816 else: 817 # only set _new_name and not name so that mock_calls is tracked 818 # but not method calls 819 _check_and_set_parent(self, value, None, name) 820 setattr(type(self), name, value) 821 self._mock_children[name] = value 822 elif name == '__class__': 823 self._spec_class = value 824 return 825 else: 826 if _check_and_set_parent(self, value, name, name): 827 self._mock_children[name] = value 828 829 if self._mock_sealed and not hasattr(self, name): 830 mock_name = f'{self._extract_mock_name()}.{name}' 831 raise AttributeError(f'Cannot set {mock_name}') 832 833 if isinstance(value, PropertyMock): 834 self.__dict__[name] = value 835 return 836 return object.__setattr__(self, name, value) 837 838 839 def __delattr__(self, name): 840 if name in _all_magics and name in type(self).__dict__: 841 delattr(type(self), name) 842 if name not in self.__dict__: 843 # for magic methods that are still MagicProxy objects and 844 # not set on the instance itself 845 return 846 847 obj = self._mock_children.get(name, _missing) 848 if name in self.__dict__: 849 _safe_super(NonCallableMock, self).__delattr__(name) 850 elif obj is _deleted: 851 raise AttributeError(name) 852 if obj is not _missing: 853 del self._mock_children[name] 854 self._mock_children[name] = _deleted 855 856 857 def _format_mock_call_signature(self, args, kwargs): 858 name = self._mock_name or 'mock' 859 return _format_call_signature(name, args, kwargs) 860 861 862 def _format_mock_failure_message(self, args, kwargs, action='call'): 863 message = 'expected %s not found.\nExpected: %s\n Actual: %s' 864 expected_string = self._format_mock_call_signature(args, kwargs) 865 call_args = self.call_args 866 actual_string = self._format_mock_call_signature(*call_args) 867 return message % (action, expected_string, actual_string) 868 869 870 def _get_call_signature_from_name(self, name): 871 """ 872 * If call objects are asserted against a method/function like obj.meth1 873 then there could be no name for the call object to lookup. Hence just 874 return the spec_signature of the method/function being asserted against. 875 * If the name is not empty then remove () and split by '.' to get 876 list of names to iterate through the children until a potential 877 match is found. A child mock is created only during attribute access 878 so if we get a _SpecState then no attributes of the spec were accessed 879 and can be safely exited. 880 """ 881 if not name: 882 return self._spec_signature 883 884 sig = None 885 names = name.replace('()', '').split('.') 886 children = self._mock_children 887 888 for name in names: 889 child = children.get(name) 890 if child is None or isinstance(child, _SpecState): 891 break 892 else: 893 # If an autospecced object is attached using attach_mock the 894 # child would be a function with mock object as attribute from 895 # which signature has to be derived. 896 child = _extract_mock(child) 897 children = child._mock_children 898 sig = child._spec_signature 899 900 return sig 901 902 903 def _call_matcher(self, _call): 904 """ 905 Given a call (or simply an (args, kwargs) tuple), return a 906 comparison key suitable for matching with other calls. 907 This is a best effort method which relies on the spec's signature, 908 if available, or falls back on the arguments themselves. 909 """ 910 911 if isinstance(_call, tuple) and len(_call) > 2: 912 sig = self._get_call_signature_from_name(_call[0]) 913 else: 914 sig = self._spec_signature 915 916 if sig is not None: 917 if len(_call) == 2: 918 name = '' 919 args, kwargs = _call 920 else: 921 name, args, kwargs = _call 922 try: 923 bound_call = sig.bind(*args, **kwargs) 924 return call(name, bound_call.args, bound_call.kwargs) 925 except TypeError as e: 926 return e.with_traceback(None) 927 else: 928 return _call 929 930 def assert_not_called(self): 931 """assert that the mock was never called. 932 """ 933 if self.call_count != 0: 934 msg = ("Expected '%s' to not have been called. Called %s times.%s" 935 % (self._mock_name or 'mock', 936 self.call_count, 937 self._calls_repr())) 938 raise AssertionError(msg) 939 940 def assert_called(self): 941 """assert that the mock was called at least once 942 """ 943 if self.call_count == 0: 944 msg = ("Expected '%s' to have been called." % 945 (self._mock_name or 'mock')) 946 raise AssertionError(msg) 947 948 def assert_called_once(self): 949 """assert that the mock was called only once. 950 """ 951 if not self.call_count == 1: 952 msg = ("Expected '%s' to have been called once. Called %s times.%s" 953 % (self._mock_name or 'mock', 954 self.call_count, 955 self._calls_repr())) 956 raise AssertionError(msg) 957 958 def assert_called_with(self, /, *args, **kwargs): 959 """assert that the last call was made with the specified arguments. 960 961 Raises an AssertionError if the args and keyword args passed in are 962 different to the last call to the mock.""" 963 if self.call_args is None: 964 expected = self._format_mock_call_signature(args, kwargs) 965 actual = 'not called.' 966 error_message = ('expected call not found.\nExpected: %s\n Actual: %s' 967 % (expected, actual)) 968 raise AssertionError(error_message) 969 970 def _error_message(): 971 msg = self._format_mock_failure_message(args, kwargs) 972 return msg 973 expected = self._call_matcher(_Call((args, kwargs), two=True)) 974 actual = self._call_matcher(self.call_args) 975 if actual != expected: 976 cause = expected if isinstance(expected, Exception) else None 977 raise AssertionError(_error_message()) from cause 978 979 980 def assert_called_once_with(self, /, *args, **kwargs): 981 """assert that the mock was called exactly once and that that call was 982 with the specified arguments.""" 983 if not self.call_count == 1: 984 msg = ("Expected '%s' to be called once. Called %s times.%s" 985 % (self._mock_name or 'mock', 986 self.call_count, 987 self._calls_repr())) 988 raise AssertionError(msg) 989 return self.assert_called_with(*args, **kwargs) 990 991 992 def assert_has_calls(self, calls, any_order=False): 993 """assert the mock has been called with the specified calls. 994 The `mock_calls` list is checked for the calls. 995 996 If `any_order` is False (the default) then the calls must be 997 sequential. There can be extra calls before or after the 998 specified calls. 999 1000 If `any_order` is True then the calls can be in any order, but 1001 they must all appear in `mock_calls`.""" 1002 expected = [self._call_matcher(c) for c in calls] 1003 cause = next((e for e in expected if isinstance(e, Exception)), None) 1004 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 1005 if not any_order: 1006 if expected not in all_calls: 1007 if cause is None: 1008 problem = 'Calls not found.' 1009 else: 1010 problem = ('Error processing expected calls.\n' 1011 'Errors: {}').format( 1012 [e if isinstance(e, Exception) else None 1013 for e in expected]) 1014 raise AssertionError( 1015 f'{problem}\n' 1016 f'Expected: {_CallList(calls)}\n' 1017 f' Actual: {safe_repr(self.mock_calls)}' 1018 ) from cause 1019 return 1020 1021 all_calls = list(all_calls) 1022 1023 not_found = [] 1024 for kall in expected: 1025 try: 1026 all_calls.remove(kall) 1027 except ValueError: 1028 not_found.append(kall) 1029 if not_found: 1030 raise AssertionError( 1031 '%r does not contain all of %r in its call list, ' 1032 'found %r instead' % (self._mock_name or 'mock', 1033 tuple(not_found), all_calls) 1034 ) from cause 1035 1036 1037 def assert_any_call(self, /, *args, **kwargs): 1038 """assert the mock has been called with the specified arguments. 1039 1040 The assert passes if the mock has *ever* been called, unlike 1041 `assert_called_with` and `assert_called_once_with` that only pass if 1042 the call is the most recent one.""" 1043 expected = self._call_matcher(_Call((args, kwargs), two=True)) 1044 cause = expected if isinstance(expected, Exception) else None 1045 actual = [self._call_matcher(c) for c in self.call_args_list] 1046 if cause or expected not in _AnyComparer(actual): 1047 expected_string = self._format_mock_call_signature(args, kwargs) 1048 raise AssertionError( 1049 '%s call not found' % expected_string 1050 ) from cause 1051 1052 1053 def _get_child_mock(self, /, **kw): 1054 """Create the child mocks for attributes and return value. 1055 By default child mocks will be the same type as the parent. 1056 Subclasses of Mock may want to override this to customize the way 1057 child mocks are made. 1058 1059 For non-callable mocks the callable variant will be used (rather than 1060 any custom subclass).""" 1061 if self._mock_sealed: 1062 attribute = f".{kw['name']}" if "name" in kw else "()" 1063 mock_name = self._extract_mock_name() + attribute 1064 raise AttributeError(mock_name) 1065 1066 _new_name = kw.get("_new_name") 1067 if _new_name in self.__dict__['_spec_asyncs']: 1068 return AsyncMock(**kw) 1069 1070 _type = type(self) 1071 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 1072 # Any asynchronous magic becomes an AsyncMock 1073 klass = AsyncMock 1074 elif issubclass(_type, AsyncMockMixin): 1075 if (_new_name in _all_sync_magics or 1076 self._mock_methods and _new_name in self._mock_methods): 1077 # Any synchronous method on AsyncMock becomes a MagicMock 1078 klass = MagicMock 1079 else: 1080 klass = AsyncMock 1081 elif not issubclass(_type, CallableMixin): 1082 if issubclass(_type, NonCallableMagicMock): 1083 klass = MagicMock 1084 elif issubclass(_type, NonCallableMock): 1085 klass = Mock 1086 else: 1087 klass = _type.__mro__[1] 1088 return klass(**kw) 1089 1090 1091 def _calls_repr(self): 1092 """Renders self.mock_calls as a string. 1093 1094 Example: "\nCalls: [call(1), call(2)]." 1095 1096 If self.mock_calls is empty, an empty string is returned. The 1097 output will be truncated if very long. 1098 """ 1099 if not self.mock_calls: 1100 return "" 1101 return f"\nCalls: {safe_repr(self.mock_calls)}." 1102 1103 1104# Denylist for forbidden attribute names in safe mode 1105_ATTRIB_DENY_LIST = frozenset({ 1106 name.removeprefix("assert_") 1107 for name in dir(NonCallableMock) 1108 if name.startswith("assert_") 1109}) 1110 1111 1112class _AnyComparer(list): 1113 """A list which checks if it contains a call which may have an 1114 argument of ANY, flipping the components of item and self from 1115 their traditional locations so that ANY is guaranteed to be on 1116 the left.""" 1117 def __contains__(self, item): 1118 for _call in self: 1119 assert len(item) == len(_call) 1120 if all([ 1121 expected == actual 1122 for expected, actual in zip(item, _call) 1123 ]): 1124 return True 1125 return False 1126 1127 1128def _try_iter(obj): 1129 if obj is None: 1130 return obj 1131 if _is_exception(obj): 1132 return obj 1133 if _callable(obj): 1134 return obj 1135 try: 1136 return iter(obj) 1137 except TypeError: 1138 # XXXX backwards compatibility 1139 # but this will blow up on first call - so maybe we should fail early? 1140 return obj 1141 1142 1143class CallableMixin(Base): 1144 1145 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 1146 wraps=None, name=None, spec_set=None, parent=None, 1147 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 1148 self.__dict__['_mock_return_value'] = return_value 1149 _safe_super(CallableMixin, self).__init__( 1150 spec, wraps, name, spec_set, parent, 1151 _spec_state, _new_name, _new_parent, **kwargs 1152 ) 1153 1154 self.side_effect = side_effect 1155 1156 1157 def _mock_check_sig(self, /, *args, **kwargs): 1158 # stub method that can be replaced with one with a specific signature 1159 pass 1160 1161 1162 def __call__(self, /, *args, **kwargs): 1163 # can't use self in-case a function / method we are mocking uses self 1164 # in the signature 1165 self._mock_check_sig(*args, **kwargs) 1166 self._increment_mock_call(*args, **kwargs) 1167 return self._mock_call(*args, **kwargs) 1168 1169 1170 def _mock_call(self, /, *args, **kwargs): 1171 return self._execute_mock_call(*args, **kwargs) 1172 1173 def _increment_mock_call(self, /, *args, **kwargs): 1174 self.called = True 1175 self.call_count += 1 1176 1177 # handle call_args 1178 # needs to be set here so assertions on call arguments pass before 1179 # execution in the case of awaited calls 1180 _call = _Call((args, kwargs), two=True) 1181 self.call_args = _call 1182 self.call_args_list.append(_call) 1183 1184 # initial stuff for method_calls: 1185 do_method_calls = self._mock_parent is not None 1186 method_call_name = self._mock_name 1187 1188 # initial stuff for mock_calls: 1189 mock_call_name = self._mock_new_name 1190 is_a_call = mock_call_name == '()' 1191 self.mock_calls.append(_Call(('', args, kwargs))) 1192 1193 # follow up the chain of mocks: 1194 _new_parent = self._mock_new_parent 1195 while _new_parent is not None: 1196 1197 # handle method_calls: 1198 if do_method_calls: 1199 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 1200 do_method_calls = _new_parent._mock_parent is not None 1201 if do_method_calls: 1202 method_call_name = _new_parent._mock_name + '.' + method_call_name 1203 1204 # handle mock_calls: 1205 this_mock_call = _Call((mock_call_name, args, kwargs)) 1206 _new_parent.mock_calls.append(this_mock_call) 1207 1208 if _new_parent._mock_new_name: 1209 if is_a_call: 1210 dot = '' 1211 else: 1212 dot = '.' 1213 is_a_call = _new_parent._mock_new_name == '()' 1214 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 1215 1216 # follow the parental chain: 1217 _new_parent = _new_parent._mock_new_parent 1218 1219 def _execute_mock_call(self, /, *args, **kwargs): 1220 # separate from _increment_mock_call so that awaited functions are 1221 # executed separately from their call, also AsyncMock overrides this method 1222 1223 effect = self.side_effect 1224 if effect is not None: 1225 if _is_exception(effect): 1226 raise effect 1227 elif not _callable(effect): 1228 result = next(effect) 1229 if _is_exception(result): 1230 raise result 1231 else: 1232 result = effect(*args, **kwargs) 1233 1234 if result is not DEFAULT: 1235 return result 1236 1237 if self._mock_return_value is not DEFAULT: 1238 return self.return_value 1239 1240 if self._mock_delegate and self._mock_delegate.return_value is not DEFAULT: 1241 return self.return_value 1242 1243 if self._mock_wraps is not None: 1244 return self._mock_wraps(*args, **kwargs) 1245 1246 return self.return_value 1247 1248 1249 1250class Mock(CallableMixin, NonCallableMock): 1251 """ 1252 Create a new `Mock` object. `Mock` takes several optional arguments 1253 that specify the behaviour of the Mock object: 1254 1255 * `spec`: This can be either a list of strings or an existing object (a 1256 class or instance) that acts as the specification for the mock object. If 1257 you pass in an object then a list of strings is formed by calling dir on 1258 the object (excluding unsupported magic attributes and methods). Accessing 1259 any attribute not in this list will raise an `AttributeError`. 1260 1261 If `spec` is an object (rather than a list of strings) then 1262 `mock.__class__` returns the class of the spec object. This allows mocks 1263 to pass `isinstance` tests. 1264 1265 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 1266 or get an attribute on the mock that isn't on the object passed as 1267 `spec_set` will raise an `AttributeError`. 1268 1269 * `side_effect`: A function to be called whenever the Mock is called. See 1270 the `side_effect` attribute. Useful for raising exceptions or 1271 dynamically changing return values. The function is called with the same 1272 arguments as the mock, and unless it returns `DEFAULT`, the return 1273 value of this function is used as the return value. 1274 1275 If `side_effect` is an iterable then each call to the mock will return 1276 the next value from the iterable. If any of the members of the iterable 1277 are exceptions they will be raised instead of returned. 1278 1279 * `return_value`: The value returned when the mock is called. By default 1280 this is a new Mock (created on first access). See the 1281 `return_value` attribute. 1282 1283 * `unsafe`: By default, accessing any attribute whose name starts with 1284 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError. 1285 Additionally, an AttributeError is raised when accessing 1286 attributes that match the name of an assertion method without the prefix 1287 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`. 1288 Passing `unsafe=True` will allow access to these attributes. 1289 1290 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 1291 calling the Mock will pass the call through to the wrapped object 1292 (returning the real result). Attribute access on the mock will return a 1293 Mock object that wraps the corresponding attribute of the wrapped object 1294 (so attempting to access an attribute that doesn't exist will raise an 1295 `AttributeError`). 1296 1297 If the mock has an explicit `return_value` set then calls are not passed 1298 to the wrapped object and the `return_value` is returned instead. 1299 1300 * `name`: If the mock has a name then it will be used in the repr of the 1301 mock. This can be useful for debugging. The name is propagated to child 1302 mocks. 1303 1304 Mocks can also be called with arbitrary keyword arguments. These will be 1305 used to set attributes on the mock after it is created. 1306 """ 1307 1308 1309# _check_spec_arg_typos takes kwargs from commands like patch and checks that 1310# they don't contain common misspellings of arguments related to autospeccing. 1311def _check_spec_arg_typos(kwargs_to_check): 1312 typos = ("autospect", "auto_spec", "set_spec") 1313 for typo in typos: 1314 if typo in kwargs_to_check: 1315 raise RuntimeError( 1316 f"{typo!r} might be a typo; use unsafe=True if this is intended" 1317 ) 1318 1319 1320class _patch(object): 1321 1322 attribute_name = None 1323 _active_patches = [] 1324 1325 def __init__( 1326 self, getter, attribute, new, spec, create, 1327 spec_set, autospec, new_callable, kwargs, *, unsafe=False 1328 ): 1329 if new_callable is not None: 1330 if new is not DEFAULT: 1331 raise ValueError( 1332 "Cannot use 'new' and 'new_callable' together" 1333 ) 1334 if autospec is not None: 1335 raise ValueError( 1336 "Cannot use 'autospec' and 'new_callable' together" 1337 ) 1338 if not unsafe: 1339 _check_spec_arg_typos(kwargs) 1340 if _is_instance_mock(spec): 1341 raise InvalidSpecError( 1342 f'Cannot spec attr {attribute!r} as the spec ' 1343 f'has already been mocked out. [spec={spec!r}]') 1344 if _is_instance_mock(spec_set): 1345 raise InvalidSpecError( 1346 f'Cannot spec attr {attribute!r} as the spec_set ' 1347 f'target has already been mocked out. [spec_set={spec_set!r}]') 1348 1349 self.getter = getter 1350 self.attribute = attribute 1351 self.new = new 1352 self.new_callable = new_callable 1353 self.spec = spec 1354 self.create = create 1355 self.has_local = False 1356 self.spec_set = spec_set 1357 self.autospec = autospec 1358 self.kwargs = kwargs 1359 self.additional_patchers = [] 1360 self.is_started = False 1361 1362 1363 def copy(self): 1364 patcher = _patch( 1365 self.getter, self.attribute, self.new, self.spec, 1366 self.create, self.spec_set, 1367 self.autospec, self.new_callable, self.kwargs 1368 ) 1369 patcher.attribute_name = self.attribute_name 1370 patcher.additional_patchers = [ 1371 p.copy() for p in self.additional_patchers 1372 ] 1373 return patcher 1374 1375 1376 def __call__(self, func): 1377 if isinstance(func, type): 1378 return self.decorate_class(func) 1379 if inspect.iscoroutinefunction(func): 1380 return self.decorate_async_callable(func) 1381 return self.decorate_callable(func) 1382 1383 1384 def decorate_class(self, klass): 1385 for attr in dir(klass): 1386 if not attr.startswith(patch.TEST_PREFIX): 1387 continue 1388 1389 attr_value = getattr(klass, attr) 1390 if not hasattr(attr_value, "__call__"): 1391 continue 1392 1393 patcher = self.copy() 1394 setattr(klass, attr, patcher(attr_value)) 1395 return klass 1396 1397 1398 @contextlib.contextmanager 1399 def decoration_helper(self, patched, args, keywargs): 1400 extra_args = [] 1401 with contextlib.ExitStack() as exit_stack: 1402 for patching in patched.patchings: 1403 arg = exit_stack.enter_context(patching) 1404 if patching.attribute_name is not None: 1405 keywargs.update(arg) 1406 elif patching.new is DEFAULT: 1407 extra_args.append(arg) 1408 1409 args += tuple(extra_args) 1410 yield (args, keywargs) 1411 1412 1413 def decorate_callable(self, func): 1414 # NB. Keep the method in sync with decorate_async_callable() 1415 if hasattr(func, 'patchings'): 1416 func.patchings.append(self) 1417 return func 1418 1419 @wraps(func) 1420 def patched(*args, **keywargs): 1421 with self.decoration_helper(patched, 1422 args, 1423 keywargs) as (newargs, newkeywargs): 1424 return func(*newargs, **newkeywargs) 1425 1426 patched.patchings = [self] 1427 return patched 1428 1429 1430 def decorate_async_callable(self, func): 1431 # NB. Keep the method in sync with decorate_callable() 1432 if hasattr(func, 'patchings'): 1433 func.patchings.append(self) 1434 return func 1435 1436 @wraps(func) 1437 async def patched(*args, **keywargs): 1438 with self.decoration_helper(patched, 1439 args, 1440 keywargs) as (newargs, newkeywargs): 1441 return await func(*newargs, **newkeywargs) 1442 1443 patched.patchings = [self] 1444 return patched 1445 1446 1447 def get_original(self): 1448 target = self.getter() 1449 name = self.attribute 1450 1451 original = DEFAULT 1452 local = False 1453 1454 try: 1455 original = target.__dict__[name] 1456 except (AttributeError, KeyError): 1457 original = getattr(target, name, DEFAULT) 1458 else: 1459 local = True 1460 1461 if name in _builtins and isinstance(target, ModuleType): 1462 self.create = True 1463 1464 if not self.create and original is DEFAULT: 1465 raise AttributeError( 1466 "%s does not have the attribute %r" % (target, name) 1467 ) 1468 return original, local 1469 1470 1471 def __enter__(self): 1472 """Perform the patch.""" 1473 if self.is_started: 1474 raise RuntimeError("Patch is already started") 1475 1476 new, spec, spec_set = self.new, self.spec, self.spec_set 1477 autospec, kwargs = self.autospec, self.kwargs 1478 new_callable = self.new_callable 1479 self.target = self.getter() 1480 1481 # normalise False to None 1482 if spec is False: 1483 spec = None 1484 if spec_set is False: 1485 spec_set = None 1486 if autospec is False: 1487 autospec = None 1488 1489 if spec is not None and autospec is not None: 1490 raise TypeError("Can't specify spec and autospec") 1491 if ((spec is not None or autospec is not None) and 1492 spec_set not in (True, None)): 1493 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 1494 1495 original, local = self.get_original() 1496 1497 if new is DEFAULT and autospec is None: 1498 inherit = False 1499 if spec is True: 1500 # set spec to the object we are replacing 1501 spec = original 1502 if spec_set is True: 1503 spec_set = original 1504 spec = None 1505 elif spec is not None: 1506 if spec_set is True: 1507 spec_set = spec 1508 spec = None 1509 elif spec_set is True: 1510 spec_set = original 1511 1512 if spec is not None or spec_set is not None: 1513 if original is DEFAULT: 1514 raise TypeError("Can't use 'spec' with create=True") 1515 if isinstance(original, type): 1516 # If we're patching out a class and there is a spec 1517 inherit = True 1518 1519 # Determine the Klass to use 1520 if new_callable is not None: 1521 Klass = new_callable 1522 elif spec is None and _is_async_obj(original): 1523 Klass = AsyncMock 1524 elif spec is not None or spec_set is not None: 1525 this_spec = spec 1526 if spec_set is not None: 1527 this_spec = spec_set 1528 if _is_list(this_spec): 1529 not_callable = '__call__' not in this_spec 1530 else: 1531 not_callable = not callable(this_spec) 1532 if _is_async_obj(this_spec): 1533 Klass = AsyncMock 1534 elif not_callable: 1535 Klass = NonCallableMagicMock 1536 else: 1537 Klass = MagicMock 1538 else: 1539 Klass = MagicMock 1540 1541 _kwargs = {} 1542 if spec is not None: 1543 _kwargs['spec'] = spec 1544 if spec_set is not None: 1545 _kwargs['spec_set'] = spec_set 1546 1547 # add a name to mocks 1548 if (isinstance(Klass, type) and 1549 issubclass(Klass, NonCallableMock) and self.attribute): 1550 _kwargs['name'] = self.attribute 1551 1552 _kwargs.update(kwargs) 1553 new = Klass(**_kwargs) 1554 1555 if inherit and _is_instance_mock(new): 1556 # we can only tell if the instance should be callable if the 1557 # spec is not a list 1558 this_spec = spec 1559 if spec_set is not None: 1560 this_spec = spec_set 1561 if (not _is_list(this_spec) and not 1562 _instance_callable(this_spec)): 1563 Klass = NonCallableMagicMock 1564 1565 _kwargs.pop('name') 1566 new.return_value = Klass(_new_parent=new, _new_name='()', 1567 **_kwargs) 1568 elif autospec is not None: 1569 # spec is ignored, new *must* be default, spec_set is treated 1570 # as a boolean. Should we check spec is not None and that spec_set 1571 # is a bool? 1572 if new is not DEFAULT: 1573 raise TypeError( 1574 "autospec creates the mock for you. Can't specify " 1575 "autospec and new." 1576 ) 1577 if original is DEFAULT: 1578 raise TypeError("Can't use 'autospec' with create=True") 1579 spec_set = bool(spec_set) 1580 if autospec is True: 1581 autospec = original 1582 1583 if _is_instance_mock(self.target): 1584 raise InvalidSpecError( 1585 f'Cannot autospec attr {self.attribute!r} as the patch ' 1586 f'target has already been mocked out. ' 1587 f'[target={self.target!r}, attr={autospec!r}]') 1588 if _is_instance_mock(autospec): 1589 target_name = getattr(self.target, '__name__', self.target) 1590 raise InvalidSpecError( 1591 f'Cannot autospec attr {self.attribute!r} from target ' 1592 f'{target_name!r} as it has already been mocked out. ' 1593 f'[target={self.target!r}, attr={autospec!r}]') 1594 1595 new = create_autospec(autospec, spec_set=spec_set, 1596 _name=self.attribute, **kwargs) 1597 elif kwargs: 1598 # can't set keyword args when we aren't creating the mock 1599 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 1600 raise TypeError("Can't pass kwargs to a mock we aren't creating") 1601 1602 new_attr = new 1603 1604 self.temp_original = original 1605 self.is_local = local 1606 self._exit_stack = contextlib.ExitStack() 1607 self.is_started = True 1608 try: 1609 setattr(self.target, self.attribute, new_attr) 1610 if self.attribute_name is not None: 1611 extra_args = {} 1612 if self.new is DEFAULT: 1613 extra_args[self.attribute_name] = new 1614 for patching in self.additional_patchers: 1615 arg = self._exit_stack.enter_context(patching) 1616 if patching.new is DEFAULT: 1617 extra_args.update(arg) 1618 return extra_args 1619 1620 return new 1621 except: 1622 if not self.__exit__(*sys.exc_info()): 1623 raise 1624 1625 def __exit__(self, *exc_info): 1626 """Undo the patch.""" 1627 if not self.is_started: 1628 return 1629 1630 if self.is_local and self.temp_original is not DEFAULT: 1631 setattr(self.target, self.attribute, self.temp_original) 1632 else: 1633 delattr(self.target, self.attribute) 1634 if not self.create and (not hasattr(self.target, self.attribute) or 1635 self.attribute in ('__doc__', '__module__', 1636 '__defaults__', '__annotations__', 1637 '__kwdefaults__')): 1638 # needed for proxy objects like django settings 1639 setattr(self.target, self.attribute, self.temp_original) 1640 1641 del self.temp_original 1642 del self.is_local 1643 del self.target 1644 exit_stack = self._exit_stack 1645 del self._exit_stack 1646 self.is_started = False 1647 return exit_stack.__exit__(*exc_info) 1648 1649 1650 def start(self): 1651 """Activate a patch, returning any created mock.""" 1652 result = self.__enter__() 1653 self._active_patches.append(self) 1654 return result 1655 1656 1657 def stop(self): 1658 """Stop an active patch.""" 1659 try: 1660 self._active_patches.remove(self) 1661 except ValueError: 1662 # If the patch hasn't been started this will fail 1663 return None 1664 1665 return self.__exit__(None, None, None) 1666 1667 1668 1669def _get_target(target): 1670 try: 1671 target, attribute = target.rsplit('.', 1) 1672 except (TypeError, ValueError, AttributeError): 1673 raise TypeError( 1674 f"Need a valid target to patch. You supplied: {target!r}") 1675 return partial(pkgutil.resolve_name, target), attribute 1676 1677 1678def _patch_object( 1679 target, attribute, new=DEFAULT, spec=None, 1680 create=False, spec_set=None, autospec=None, 1681 new_callable=None, *, unsafe=False, **kwargs 1682 ): 1683 """ 1684 patch the named member (`attribute`) on an object (`target`) with a mock 1685 object. 1686 1687 `patch.object` can be used as a decorator, class decorator or a context 1688 manager. Arguments `new`, `spec`, `create`, `spec_set`, 1689 `autospec` and `new_callable` have the same meaning as for `patch`. Like 1690 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 1691 the mock object it creates. 1692 1693 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 1694 for choosing which methods to wrap. 1695 """ 1696 if type(target) is str: 1697 raise TypeError( 1698 f"{target!r} must be the actual object to be patched, not a str" 1699 ) 1700 getter = lambda: target 1701 return _patch( 1702 getter, attribute, new, spec, create, 1703 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 1704 ) 1705 1706 1707def _patch_multiple(target, spec=None, create=False, spec_set=None, 1708 autospec=None, new_callable=None, **kwargs): 1709 """Perform multiple patches in a single call. It takes the object to be 1710 patched (either as an object or a string to fetch the object by importing) 1711 and keyword arguments for the patches:: 1712 1713 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 1714 ... 1715 1716 Use `DEFAULT` as the value if you want `patch.multiple` to create 1717 mocks for you. In this case the created mocks are passed into a decorated 1718 function by keyword, and a dictionary is returned when `patch.multiple` is 1719 used as a context manager. 1720 1721 `patch.multiple` can be used as a decorator, class decorator or a context 1722 manager. The arguments `spec`, `spec_set`, `create`, 1723 `autospec` and `new_callable` have the same meaning as for `patch`. These 1724 arguments will be applied to *all* patches done by `patch.multiple`. 1725 1726 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 1727 for choosing which methods to wrap. 1728 """ 1729 if type(target) is str: 1730 getter = partial(pkgutil.resolve_name, target) 1731 else: 1732 getter = lambda: target 1733 1734 if not kwargs: 1735 raise ValueError( 1736 'Must supply at least one keyword argument with patch.multiple' 1737 ) 1738 # need to wrap in a list for python 3, where items is a view 1739 items = list(kwargs.items()) 1740 attribute, new = items[0] 1741 patcher = _patch( 1742 getter, attribute, new, spec, create, spec_set, 1743 autospec, new_callable, {} 1744 ) 1745 patcher.attribute_name = attribute 1746 for attribute, new in items[1:]: 1747 this_patcher = _patch( 1748 getter, attribute, new, spec, create, spec_set, 1749 autospec, new_callable, {} 1750 ) 1751 this_patcher.attribute_name = attribute 1752 patcher.additional_patchers.append(this_patcher) 1753 return patcher 1754 1755 1756def patch( 1757 target, new=DEFAULT, spec=None, create=False, 1758 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs 1759 ): 1760 """ 1761 `patch` acts as a function decorator, class decorator or a context 1762 manager. Inside the body of the function or with statement, the `target` 1763 is patched with a `new` object. When the function/with statement exits 1764 the patch is undone. 1765 1766 If `new` is omitted, then the target is replaced with an 1767 `AsyncMock if the patched object is an async function or a 1768 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 1769 omitted, the created mock is passed in as an extra argument to the 1770 decorated function. If `patch` is used as a context manager the created 1771 mock is returned by the context manager. 1772 1773 `target` should be a string in the form `'package.module.ClassName'`. The 1774 `target` is imported and the specified object replaced with the `new` 1775 object, so the `target` must be importable from the environment you are 1776 calling `patch` from. The target is imported when the decorated function 1777 is executed, not at decoration time. 1778 1779 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 1780 if patch is creating one for you. 1781 1782 In addition you can pass `spec=True` or `spec_set=True`, which causes 1783 patch to pass in the object being mocked as the spec/spec_set object. 1784 1785 `new_callable` allows you to specify a different class, or callable object, 1786 that will be called to create the `new` object. By default `AsyncMock` is 1787 used for async functions and `MagicMock` for the rest. 1788 1789 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 1790 then the mock will be created with a spec from the object being replaced. 1791 All attributes of the mock will also have the spec of the corresponding 1792 attribute of the object being replaced. Methods and functions being 1793 mocked will have their arguments checked and will raise a `TypeError` if 1794 they are called with the wrong signature. For mocks replacing a class, 1795 their return value (the 'instance') will have the same spec as the class. 1796 1797 Instead of `autospec=True` you can pass `autospec=some_object` to use an 1798 arbitrary object as the spec instead of the one being replaced. 1799 1800 By default `patch` will fail to replace attributes that don't exist. If 1801 you pass in `create=True`, and the attribute doesn't exist, patch will 1802 create the attribute for you when the patched function is called, and 1803 delete it again afterwards. This is useful for writing tests against 1804 attributes that your production code creates at runtime. It is off by 1805 default because it can be dangerous. With it switched on you can write 1806 passing tests against APIs that don't actually exist! 1807 1808 Patch can be used as a `TestCase` class decorator. It works by 1809 decorating each test method in the class. This reduces the boilerplate 1810 code when your test methods share a common patchings set. `patch` finds 1811 tests by looking for method names that start with `patch.TEST_PREFIX`. 1812 By default this is `test`, which matches the way `unittest` finds tests. 1813 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 1814 1815 Patch can be used as a context manager, with the with statement. Here the 1816 patching applies to the indented block after the with statement. If you 1817 use "as" then the patched object will be bound to the name after the 1818 "as"; very useful if `patch` is creating a mock object for you. 1819 1820 Patch will raise a `RuntimeError` if passed some common misspellings of 1821 the arguments autospec and spec_set. Pass the argument `unsafe` with the 1822 value True to disable that check. 1823 1824 `patch` takes arbitrary keyword arguments. These will be passed to 1825 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 1826 otherwise or to `new_callable` if specified. 1827 1828 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 1829 available for alternate use-cases. 1830 """ 1831 getter, attribute = _get_target(target) 1832 return _patch( 1833 getter, attribute, new, spec, create, 1834 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 1835 ) 1836 1837 1838class _patch_dict(object): 1839 """ 1840 Patch a dictionary, or dictionary like object, and restore the dictionary 1841 to its original state after the test. 1842 1843 `in_dict` can be a dictionary or a mapping like container. If it is a 1844 mapping then it must at least support getting, setting and deleting items 1845 plus iterating over keys. 1846 1847 `in_dict` can also be a string specifying the name of the dictionary, which 1848 will then be fetched by importing it. 1849 1850 `values` can be a dictionary of values to set in the dictionary. `values` 1851 can also be an iterable of `(key, value)` pairs. 1852 1853 If `clear` is True then the dictionary will be cleared before the new 1854 values are set. 1855 1856 `patch.dict` can also be called with arbitrary keyword arguments to set 1857 values in the dictionary:: 1858 1859 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 1860 ... 1861 1862 `patch.dict` can be used as a context manager, decorator or class 1863 decorator. When used as a class decorator `patch.dict` honours 1864 `patch.TEST_PREFIX` for choosing which methods to wrap. 1865 """ 1866 1867 def __init__(self, in_dict, values=(), clear=False, **kwargs): 1868 self.in_dict = in_dict 1869 # support any argument supported by dict(...) constructor 1870 self.values = dict(values) 1871 self.values.update(kwargs) 1872 self.clear = clear 1873 self._original = None 1874 1875 1876 def __call__(self, f): 1877 if isinstance(f, type): 1878 return self.decorate_class(f) 1879 if inspect.iscoroutinefunction(f): 1880 return self.decorate_async_callable(f) 1881 return self.decorate_callable(f) 1882 1883 1884 def decorate_callable(self, f): 1885 @wraps(f) 1886 def _inner(*args, **kw): 1887 self._patch_dict() 1888 try: 1889 return f(*args, **kw) 1890 finally: 1891 self._unpatch_dict() 1892 1893 return _inner 1894 1895 1896 def decorate_async_callable(self, f): 1897 @wraps(f) 1898 async def _inner(*args, **kw): 1899 self._patch_dict() 1900 try: 1901 return await f(*args, **kw) 1902 finally: 1903 self._unpatch_dict() 1904 1905 return _inner 1906 1907 1908 def decorate_class(self, klass): 1909 for attr in dir(klass): 1910 attr_value = getattr(klass, attr) 1911 if (attr.startswith(patch.TEST_PREFIX) and 1912 hasattr(attr_value, "__call__")): 1913 decorator = _patch_dict(self.in_dict, self.values, self.clear) 1914 decorated = decorator(attr_value) 1915 setattr(klass, attr, decorated) 1916 return klass 1917 1918 1919 def __enter__(self): 1920 """Patch the dict.""" 1921 self._patch_dict() 1922 return self.in_dict 1923 1924 1925 def _patch_dict(self): 1926 values = self.values 1927 if isinstance(self.in_dict, str): 1928 self.in_dict = pkgutil.resolve_name(self.in_dict) 1929 in_dict = self.in_dict 1930 clear = self.clear 1931 1932 try: 1933 original = in_dict.copy() 1934 except AttributeError: 1935 # dict like object with no copy method 1936 # must support iteration over keys 1937 original = {} 1938 for key in in_dict: 1939 original[key] = in_dict[key] 1940 self._original = original 1941 1942 if clear: 1943 _clear_dict(in_dict) 1944 1945 try: 1946 in_dict.update(values) 1947 except AttributeError: 1948 # dict like object with no update method 1949 for key in values: 1950 in_dict[key] = values[key] 1951 1952 1953 def _unpatch_dict(self): 1954 in_dict = self.in_dict 1955 original = self._original 1956 1957 _clear_dict(in_dict) 1958 1959 try: 1960 in_dict.update(original) 1961 except AttributeError: 1962 for key in original: 1963 in_dict[key] = original[key] 1964 1965 1966 def __exit__(self, *args): 1967 """Unpatch the dict.""" 1968 if self._original is not None: 1969 self._unpatch_dict() 1970 return False 1971 1972 1973 def start(self): 1974 """Activate a patch, returning any created mock.""" 1975 result = self.__enter__() 1976 _patch._active_patches.append(self) 1977 return result 1978 1979 1980 def stop(self): 1981 """Stop an active patch.""" 1982 try: 1983 _patch._active_patches.remove(self) 1984 except ValueError: 1985 # If the patch hasn't been started this will fail 1986 return None 1987 1988 return self.__exit__(None, None, None) 1989 1990 1991def _clear_dict(in_dict): 1992 try: 1993 in_dict.clear() 1994 except AttributeError: 1995 keys = list(in_dict) 1996 for key in keys: 1997 del in_dict[key] 1998 1999 2000def _patch_stopall(): 2001 """Stop all active patches. LIFO to unroll nested patches.""" 2002 for patch in reversed(_patch._active_patches): 2003 patch.stop() 2004 2005 2006patch.object = _patch_object 2007patch.dict = _patch_dict 2008patch.multiple = _patch_multiple 2009patch.stopall = _patch_stopall 2010patch.TEST_PREFIX = 'test' 2011 2012magic_methods = ( 2013 "lt le gt ge eq ne " 2014 "getitem setitem delitem " 2015 "len contains iter " 2016 "hash str sizeof " 2017 "enter exit " 2018 # we added divmod and rdivmod here instead of numerics 2019 # because there is no idivmod 2020 "divmod rdivmod neg pos abs invert " 2021 "complex int float index " 2022 "round trunc floor ceil " 2023 "bool next " 2024 "fspath " 2025 "aiter " 2026) 2027 2028numerics = ( 2029 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow" 2030) 2031inplace = ' '.join('i%s' % n for n in numerics.split()) 2032right = ' '.join('r%s' % n for n in numerics.split()) 2033 2034# not including __prepare__, __instancecheck__, __subclasscheck__ 2035# (as they are metaclass methods) 2036# __del__ is not supported at all as it causes problems if it exists 2037 2038_non_defaults = { 2039 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 2040 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 2041 '__getstate__', '__setstate__', '__getformat__', 2042 '__repr__', '__dir__', '__subclasses__', '__format__', 2043 '__getnewargs_ex__', 2044} 2045 2046 2047def _get_method(name, func): 2048 "Turns a callable object (like a mock) into a real function" 2049 def method(self, /, *args, **kw): 2050 return func(self, *args, **kw) 2051 method.__name__ = name 2052 return method 2053 2054 2055_magics = { 2056 '__%s__' % method for method in 2057 ' '.join([magic_methods, numerics, inplace, right]).split() 2058} 2059 2060# Magic methods used for async `with` statements 2061_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 2062# Magic methods that are only used with async calls but are synchronous functions themselves 2063_sync_async_magics = {"__aiter__"} 2064_async_magics = _async_method_magics | _sync_async_magics 2065 2066_all_sync_magics = _magics | _non_defaults 2067_all_magics = _all_sync_magics | _async_magics 2068 2069_unsupported_magics = { 2070 '__getattr__', '__setattr__', 2071 '__init__', '__new__', '__prepare__', 2072 '__instancecheck__', '__subclasscheck__', 2073 '__del__' 2074} 2075 2076_calculate_return_value = { 2077 '__hash__': lambda self: object.__hash__(self), 2078 '__str__': lambda self: object.__str__(self), 2079 '__sizeof__': lambda self: object.__sizeof__(self), 2080 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 2081} 2082 2083_return_values = { 2084 '__lt__': NotImplemented, 2085 '__gt__': NotImplemented, 2086 '__le__': NotImplemented, 2087 '__ge__': NotImplemented, 2088 '__int__': 1, 2089 '__contains__': False, 2090 '__len__': 0, 2091 '__exit__': False, 2092 '__complex__': 1j, 2093 '__float__': 1.0, 2094 '__bool__': True, 2095 '__index__': 1, 2096 '__aexit__': False, 2097} 2098 2099 2100def _get_eq(self): 2101 def __eq__(other): 2102 ret_val = self.__eq__._mock_return_value 2103 if ret_val is not DEFAULT: 2104 return ret_val 2105 if self is other: 2106 return True 2107 return NotImplemented 2108 return __eq__ 2109 2110def _get_ne(self): 2111 def __ne__(other): 2112 if self.__ne__._mock_return_value is not DEFAULT: 2113 return DEFAULT 2114 if self is other: 2115 return False 2116 return NotImplemented 2117 return __ne__ 2118 2119def _get_iter(self): 2120 def __iter__(): 2121 ret_val = self.__iter__._mock_return_value 2122 if ret_val is DEFAULT: 2123 return iter([]) 2124 # if ret_val was already an iterator, then calling iter on it should 2125 # return the iterator unchanged 2126 return iter(ret_val) 2127 return __iter__ 2128 2129def _get_async_iter(self): 2130 def __aiter__(): 2131 ret_val = self.__aiter__._mock_return_value 2132 if ret_val is DEFAULT: 2133 return _AsyncIterator(iter([])) 2134 return _AsyncIterator(iter(ret_val)) 2135 return __aiter__ 2136 2137_side_effect_methods = { 2138 '__eq__': _get_eq, 2139 '__ne__': _get_ne, 2140 '__iter__': _get_iter, 2141 '__aiter__': _get_async_iter 2142} 2143 2144 2145 2146def _set_return_value(mock, method, name): 2147 fixed = _return_values.get(name, DEFAULT) 2148 if fixed is not DEFAULT: 2149 method.return_value = fixed 2150 return 2151 2152 return_calculator = _calculate_return_value.get(name) 2153 if return_calculator is not None: 2154 return_value = return_calculator(mock) 2155 method.return_value = return_value 2156 return 2157 2158 side_effector = _side_effect_methods.get(name) 2159 if side_effector is not None: 2160 method.side_effect = side_effector(mock) 2161 2162 2163 2164class MagicMixin(Base): 2165 def __init__(self, /, *args, **kw): 2166 self._mock_set_magics() # make magic work for kwargs in init 2167 _safe_super(MagicMixin, self).__init__(*args, **kw) 2168 self._mock_set_magics() # fix magic broken by upper level init 2169 2170 2171 def _mock_set_magics(self): 2172 orig_magics = _magics | _async_method_magics 2173 these_magics = orig_magics 2174 2175 if getattr(self, "_mock_methods", None) is not None: 2176 these_magics = orig_magics.intersection(self._mock_methods) 2177 2178 remove_magics = set() 2179 remove_magics = orig_magics - these_magics 2180 2181 for entry in remove_magics: 2182 if entry in type(self).__dict__: 2183 # remove unneeded magic methods 2184 delattr(self, entry) 2185 2186 # don't overwrite existing attributes if called a second time 2187 these_magics = these_magics - set(type(self).__dict__) 2188 2189 _type = type(self) 2190 for entry in these_magics: 2191 setattr(_type, entry, MagicProxy(entry, self)) 2192 2193 2194 2195class NonCallableMagicMock(MagicMixin, NonCallableMock): 2196 """A version of `MagicMock` that isn't callable.""" 2197 def mock_add_spec(self, spec, spec_set=False): 2198 """Add a spec to a mock. `spec` can either be an object or a 2199 list of strings. Only attributes on the `spec` can be fetched as 2200 attributes from the mock. 2201 2202 If `spec_set` is True then only attributes on the spec can be set.""" 2203 self._mock_add_spec(spec, spec_set) 2204 self._mock_set_magics() 2205 2206 2207class AsyncMagicMixin(MagicMixin): 2208 pass 2209 2210 2211class MagicMock(MagicMixin, Mock): 2212 """ 2213 MagicMock is a subclass of Mock with default implementations 2214 of most of the magic methods. You can use MagicMock without having to 2215 configure the magic methods yourself. 2216 2217 If you use the `spec` or `spec_set` arguments then *only* magic 2218 methods that exist in the spec will be created. 2219 2220 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 2221 """ 2222 def mock_add_spec(self, spec, spec_set=False): 2223 """Add a spec to a mock. `spec` can either be an object or a 2224 list of strings. Only attributes on the `spec` can be fetched as 2225 attributes from the mock. 2226 2227 If `spec_set` is True then only attributes on the spec can be set.""" 2228 self._mock_add_spec(spec, spec_set) 2229 self._mock_set_magics() 2230 2231 def reset_mock(self, /, *args, return_value=False, **kwargs): 2232 if ( 2233 return_value 2234 and self._mock_name 2235 and _is_magic(self._mock_name) 2236 ): 2237 # Don't reset return values for magic methods, 2238 # otherwise `m.__str__` will start 2239 # to return `MagicMock` instances, instead of `str` instances. 2240 return_value = False 2241 super().reset_mock(*args, return_value=return_value, **kwargs) 2242 2243 2244class MagicProxy(Base): 2245 def __init__(self, name, parent): 2246 self.name = name 2247 self.parent = parent 2248 2249 def create_mock(self): 2250 entry = self.name 2251 parent = self.parent 2252 m = parent._get_child_mock(name=entry, _new_name=entry, 2253 _new_parent=parent) 2254 setattr(parent, entry, m) 2255 _set_return_value(parent, m, entry) 2256 return m 2257 2258 def __get__(self, obj, _type=None): 2259 return self.create_mock() 2260 2261 2262try: 2263 _CODE_SIG = inspect.signature(partial(CodeType.__init__, None)) 2264 _CODE_ATTRS = dir(CodeType) 2265except ValueError: 2266 _CODE_SIG = None 2267 2268 2269class AsyncMockMixin(Base): 2270 await_count = _delegating_property('await_count') 2271 await_args = _delegating_property('await_args') 2272 await_args_list = _delegating_property('await_args_list') 2273 2274 def __init__(self, /, *args, **kwargs): 2275 super().__init__(*args, **kwargs) 2276 # iscoroutinefunction() checks _is_coroutine property to say if an 2277 # object is a coroutine. Without this check it looks to see if it is a 2278 # function/method, which in this case it is not (since it is an 2279 # AsyncMock). 2280 # It is set through __dict__ because when spec_set is True, this 2281 # attribute is likely undefined. 2282 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 2283 self.__dict__['_mock_await_count'] = 0 2284 self.__dict__['_mock_await_args'] = None 2285 self.__dict__['_mock_await_args_list'] = _CallList() 2286 if _CODE_SIG: 2287 code_mock = NonCallableMock(spec_set=_CODE_ATTRS) 2288 code_mock.__dict__["_spec_class"] = CodeType 2289 code_mock.__dict__["_spec_signature"] = _CODE_SIG 2290 else: 2291 code_mock = NonCallableMock(spec_set=CodeType) 2292 code_mock.co_flags = ( 2293 inspect.CO_COROUTINE 2294 + inspect.CO_VARARGS 2295 + inspect.CO_VARKEYWORDS 2296 ) 2297 code_mock.co_argcount = 0 2298 code_mock.co_varnames = ('args', 'kwargs') 2299 code_mock.co_posonlyargcount = 0 2300 code_mock.co_kwonlyargcount = 0 2301 self.__dict__['__code__'] = code_mock 2302 self.__dict__['__name__'] = 'AsyncMock' 2303 self.__dict__['__defaults__'] = tuple() 2304 self.__dict__['__kwdefaults__'] = {} 2305 self.__dict__['__annotations__'] = None 2306 2307 async def _execute_mock_call(self, /, *args, **kwargs): 2308 # This is nearly just like super(), except for special handling 2309 # of coroutines 2310 2311 _call = _Call((args, kwargs), two=True) 2312 self.await_count += 1 2313 self.await_args = _call 2314 self.await_args_list.append(_call) 2315 2316 effect = self.side_effect 2317 if effect is not None: 2318 if _is_exception(effect): 2319 raise effect 2320 elif not _callable(effect): 2321 try: 2322 result = next(effect) 2323 except StopIteration: 2324 # It is impossible to propagate a StopIteration 2325 # through coroutines because of PEP 479 2326 raise StopAsyncIteration 2327 if _is_exception(result): 2328 raise result 2329 elif iscoroutinefunction(effect): 2330 result = await effect(*args, **kwargs) 2331 else: 2332 result = effect(*args, **kwargs) 2333 2334 if result is not DEFAULT: 2335 return result 2336 2337 if self._mock_return_value is not DEFAULT: 2338 return self.return_value 2339 2340 if self._mock_wraps is not None: 2341 if iscoroutinefunction(self._mock_wraps): 2342 return await self._mock_wraps(*args, **kwargs) 2343 return self._mock_wraps(*args, **kwargs) 2344 2345 return self.return_value 2346 2347 def assert_awaited(self): 2348 """ 2349 Assert that the mock was awaited at least once. 2350 """ 2351 if self.await_count == 0: 2352 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 2353 raise AssertionError(msg) 2354 2355 def assert_awaited_once(self): 2356 """ 2357 Assert that the mock was awaited exactly once. 2358 """ 2359 if not self.await_count == 1: 2360 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2361 f" Awaited {self.await_count} times.") 2362 raise AssertionError(msg) 2363 2364 def assert_awaited_with(self, /, *args, **kwargs): 2365 """ 2366 Assert that the last await was with the specified arguments. 2367 """ 2368 if self.await_args is None: 2369 expected = self._format_mock_call_signature(args, kwargs) 2370 raise AssertionError(f'Expected await: {expected}\nNot awaited') 2371 2372 def _error_message(): 2373 msg = self._format_mock_failure_message(args, kwargs, action='await') 2374 return msg 2375 2376 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2377 actual = self._call_matcher(self.await_args) 2378 if actual != expected: 2379 cause = expected if isinstance(expected, Exception) else None 2380 raise AssertionError(_error_message()) from cause 2381 2382 def assert_awaited_once_with(self, /, *args, **kwargs): 2383 """ 2384 Assert that the mock was awaited exactly once and with the specified 2385 arguments. 2386 """ 2387 if not self.await_count == 1: 2388 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2389 f" Awaited {self.await_count} times.") 2390 raise AssertionError(msg) 2391 return self.assert_awaited_with(*args, **kwargs) 2392 2393 def assert_any_await(self, /, *args, **kwargs): 2394 """ 2395 Assert the mock has ever been awaited with the specified arguments. 2396 """ 2397 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2398 cause = expected if isinstance(expected, Exception) else None 2399 actual = [self._call_matcher(c) for c in self.await_args_list] 2400 if cause or expected not in _AnyComparer(actual): 2401 expected_string = self._format_mock_call_signature(args, kwargs) 2402 raise AssertionError( 2403 '%s await not found' % expected_string 2404 ) from cause 2405 2406 def assert_has_awaits(self, calls, any_order=False): 2407 """ 2408 Assert the mock has been awaited with the specified calls. 2409 The :attr:`await_args_list` list is checked for the awaits. 2410 2411 If `any_order` is False (the default) then the awaits must be 2412 sequential. There can be extra calls before or after the 2413 specified awaits. 2414 2415 If `any_order` is True then the awaits can be in any order, but 2416 they must all appear in :attr:`await_args_list`. 2417 """ 2418 expected = [self._call_matcher(c) for c in calls] 2419 cause = next((e for e in expected if isinstance(e, Exception)), None) 2420 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 2421 if not any_order: 2422 if expected not in all_awaits: 2423 if cause is None: 2424 problem = 'Awaits not found.' 2425 else: 2426 problem = ('Error processing expected awaits.\n' 2427 'Errors: {}').format( 2428 [e if isinstance(e, Exception) else None 2429 for e in expected]) 2430 raise AssertionError( 2431 f'{problem}\n' 2432 f'Expected: {_CallList(calls)}\n' 2433 f'Actual: {self.await_args_list}' 2434 ) from cause 2435 return 2436 2437 all_awaits = list(all_awaits) 2438 2439 not_found = [] 2440 for kall in expected: 2441 try: 2442 all_awaits.remove(kall) 2443 except ValueError: 2444 not_found.append(kall) 2445 if not_found: 2446 raise AssertionError( 2447 '%r not all found in await list' % (tuple(not_found),) 2448 ) from cause 2449 2450 def assert_not_awaited(self): 2451 """ 2452 Assert that the mock was never awaited. 2453 """ 2454 if self.await_count != 0: 2455 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 2456 f" Awaited {self.await_count} times.") 2457 raise AssertionError(msg) 2458 2459 def reset_mock(self, /, *args, **kwargs): 2460 """ 2461 See :func:`.Mock.reset_mock()` 2462 """ 2463 super().reset_mock(*args, **kwargs) 2464 self.await_count = 0 2465 self.await_args = None 2466 self.await_args_list = _CallList() 2467 2468 2469class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 2470 """ 2471 Enhance :class:`Mock` with features allowing to mock 2472 an async function. 2473 2474 The :class:`AsyncMock` object will behave so the object is 2475 recognized as an async function, and the result of a call is an awaitable: 2476 2477 >>> mock = AsyncMock() 2478 >>> iscoroutinefunction(mock) 2479 True 2480 >>> inspect.isawaitable(mock()) 2481 True 2482 2483 2484 The result of ``mock()`` is an async function which will have the outcome 2485 of ``side_effect`` or ``return_value``: 2486 2487 - if ``side_effect`` is a function, the async function will return the 2488 result of that function, 2489 - if ``side_effect`` is an exception, the async function will raise the 2490 exception, 2491 - if ``side_effect`` is an iterable, the async function will return the 2492 next value of the iterable, however, if the sequence of result is 2493 exhausted, ``StopIteration`` is raised immediately, 2494 - if ``side_effect`` is not defined, the async function will return the 2495 value defined by ``return_value``, hence, by default, the async function 2496 returns a new :class:`AsyncMock` object. 2497 2498 If the outcome of ``side_effect`` or ``return_value`` is an async function, 2499 the mock async function obtained when the mock object is called will be this 2500 async function itself (and not an async function returning an async 2501 function). 2502 2503 The test author can also specify a wrapped object with ``wraps``. In this 2504 case, the :class:`Mock` object behavior is the same as with an 2505 :class:`.Mock` object: the wrapped object may have methods 2506 defined as async function functions. 2507 2508 Based on Martin Richard's asynctest project. 2509 """ 2510 2511 2512class _ANY(object): 2513 "A helper object that compares equal to everything." 2514 2515 def __eq__(self, other): 2516 return True 2517 2518 def __ne__(self, other): 2519 return False 2520 2521 def __repr__(self): 2522 return '<ANY>' 2523 2524ANY = _ANY() 2525 2526 2527 2528def _format_call_signature(name, args, kwargs): 2529 message = '%s(%%s)' % name 2530 formatted_args = '' 2531 args_string = ', '.join([repr(arg) for arg in args]) 2532 kwargs_string = ', '.join([ 2533 '%s=%r' % (key, value) for key, value in kwargs.items() 2534 ]) 2535 if args_string: 2536 formatted_args = args_string 2537 if kwargs_string: 2538 if formatted_args: 2539 formatted_args += ', ' 2540 formatted_args += kwargs_string 2541 2542 return message % formatted_args 2543 2544 2545 2546class _Call(tuple): 2547 """ 2548 A tuple for holding the results of a call to a mock, either in the form 2549 `(args, kwargs)` or `(name, args, kwargs)`. 2550 2551 If args or kwargs are empty then a call tuple will compare equal to 2552 a tuple without those values. This makes comparisons less verbose:: 2553 2554 _Call(('name', (), {})) == ('name',) 2555 _Call(('name', (1,), {})) == ('name', (1,)) 2556 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 2557 2558 The `_Call` object provides a useful shortcut for comparing with call:: 2559 2560 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 2561 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 2562 2563 If the _Call has no name then it will match any name. 2564 """ 2565 def __new__(cls, value=(), name='', parent=None, two=False, 2566 from_kall=True): 2567 args = () 2568 kwargs = {} 2569 _len = len(value) 2570 if _len == 3: 2571 name, args, kwargs = value 2572 elif _len == 2: 2573 first, second = value 2574 if isinstance(first, str): 2575 name = first 2576 if isinstance(second, tuple): 2577 args = second 2578 else: 2579 kwargs = second 2580 else: 2581 args, kwargs = first, second 2582 elif _len == 1: 2583 value, = value 2584 if isinstance(value, str): 2585 name = value 2586 elif isinstance(value, tuple): 2587 args = value 2588 else: 2589 kwargs = value 2590 2591 if two: 2592 return tuple.__new__(cls, (args, kwargs)) 2593 2594 return tuple.__new__(cls, (name, args, kwargs)) 2595 2596 2597 def __init__(self, value=(), name=None, parent=None, two=False, 2598 from_kall=True): 2599 self._mock_name = name 2600 self._mock_parent = parent 2601 self._mock_from_kall = from_kall 2602 2603 2604 def __eq__(self, other): 2605 try: 2606 len_other = len(other) 2607 except TypeError: 2608 return NotImplemented 2609 2610 self_name = '' 2611 if len(self) == 2: 2612 self_args, self_kwargs = self 2613 else: 2614 self_name, self_args, self_kwargs = self 2615 2616 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 2617 and self._mock_parent != other._mock_parent): 2618 return False 2619 2620 other_name = '' 2621 if len_other == 0: 2622 other_args, other_kwargs = (), {} 2623 elif len_other == 3: 2624 other_name, other_args, other_kwargs = other 2625 elif len_other == 1: 2626 value, = other 2627 if isinstance(value, tuple): 2628 other_args = value 2629 other_kwargs = {} 2630 elif isinstance(value, str): 2631 other_name = value 2632 other_args, other_kwargs = (), {} 2633 else: 2634 other_args = () 2635 other_kwargs = value 2636 elif len_other == 2: 2637 # could be (name, args) or (name, kwargs) or (args, kwargs) 2638 first, second = other 2639 if isinstance(first, str): 2640 other_name = first 2641 if isinstance(second, tuple): 2642 other_args, other_kwargs = second, {} 2643 else: 2644 other_args, other_kwargs = (), second 2645 else: 2646 other_args, other_kwargs = first, second 2647 else: 2648 return False 2649 2650 if self_name and other_name != self_name: 2651 return False 2652 2653 # this order is important for ANY to work! 2654 return (other_args, other_kwargs) == (self_args, self_kwargs) 2655 2656 2657 __ne__ = object.__ne__ 2658 2659 2660 def __call__(self, /, *args, **kwargs): 2661 if self._mock_name is None: 2662 return _Call(('', args, kwargs), name='()') 2663 2664 name = self._mock_name + '()' 2665 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 2666 2667 2668 def __getattr__(self, attr): 2669 if self._mock_name is None: 2670 return _Call(name=attr, from_kall=False) 2671 name = '%s.%s' % (self._mock_name, attr) 2672 return _Call(name=name, parent=self, from_kall=False) 2673 2674 2675 def __getattribute__(self, attr): 2676 if attr in tuple.__dict__: 2677 raise AttributeError 2678 return tuple.__getattribute__(self, attr) 2679 2680 2681 def _get_call_arguments(self): 2682 if len(self) == 2: 2683 args, kwargs = self 2684 else: 2685 name, args, kwargs = self 2686 2687 return args, kwargs 2688 2689 @property 2690 def args(self): 2691 return self._get_call_arguments()[0] 2692 2693 @property 2694 def kwargs(self): 2695 return self._get_call_arguments()[1] 2696 2697 def __repr__(self): 2698 if not self._mock_from_kall: 2699 name = self._mock_name or 'call' 2700 if name.startswith('()'): 2701 name = 'call%s' % name 2702 return name 2703 2704 if len(self) == 2: 2705 name = 'call' 2706 args, kwargs = self 2707 else: 2708 name, args, kwargs = self 2709 if not name: 2710 name = 'call' 2711 elif not name.startswith('()'): 2712 name = 'call.%s' % name 2713 else: 2714 name = 'call%s' % name 2715 return _format_call_signature(name, args, kwargs) 2716 2717 2718 def call_list(self): 2719 """For a call object that represents multiple calls, `call_list` 2720 returns a list of all the intermediate calls as well as the 2721 final call.""" 2722 vals = [] 2723 thing = self 2724 while thing is not None: 2725 if thing._mock_from_kall: 2726 vals.append(thing) 2727 thing = thing._mock_parent 2728 return _CallList(reversed(vals)) 2729 2730 2731call = _Call(from_kall=False) 2732 2733 2734def create_autospec(spec, spec_set=False, instance=False, _parent=None, 2735 _name=None, *, unsafe=False, **kwargs): 2736 """Create a mock object using another object as a spec. Attributes on the 2737 mock will use the corresponding attribute on the `spec` object as their 2738 spec. 2739 2740 Functions or methods being mocked will have their arguments checked 2741 to check that they are called with the correct signature. 2742 2743 If `spec_set` is True then attempting to set attributes that don't exist 2744 on the spec object will raise an `AttributeError`. 2745 2746 If a class is used as a spec then the return value of the mock (the 2747 instance of the class) will have the same spec. You can use a class as the 2748 spec for an instance object by passing `instance=True`. The returned mock 2749 will only be callable if instances of the mock are callable. 2750 2751 `create_autospec` will raise a `RuntimeError` if passed some common 2752 misspellings of the arguments autospec and spec_set. Pass the argument 2753 `unsafe` with the value True to disable that check. 2754 2755 `create_autospec` also takes arbitrary keyword arguments that are passed to 2756 the constructor of the created mock.""" 2757 if _is_list(spec): 2758 # can't pass a list instance to the mock constructor as it will be 2759 # interpreted as a list of strings 2760 spec = type(spec) 2761 2762 is_type = isinstance(spec, type) 2763 if _is_instance_mock(spec): 2764 raise InvalidSpecError(f'Cannot autospec a Mock object. ' 2765 f'[object={spec!r}]') 2766 is_async_func = _is_async_func(spec) 2767 _kwargs = {'spec': spec} 2768 if spec_set: 2769 _kwargs = {'spec_set': spec} 2770 elif spec is None: 2771 # None we mock with a normal mock without a spec 2772 _kwargs = {} 2773 if _kwargs and instance: 2774 _kwargs['_spec_as_instance'] = True 2775 if not unsafe: 2776 _check_spec_arg_typos(kwargs) 2777 2778 _name = kwargs.pop('name', _name) 2779 _new_name = _name 2780 if _parent is None: 2781 # for a top level object no _new_name should be set 2782 _new_name = '' 2783 2784 _kwargs.update(kwargs) 2785 2786 Klass = MagicMock 2787 if inspect.isdatadescriptor(spec): 2788 # descriptors don't have a spec 2789 # because we don't know what type they return 2790 _kwargs = {} 2791 elif is_async_func: 2792 if instance: 2793 raise RuntimeError("Instance can not be True when create_autospec " 2794 "is mocking an async function") 2795 Klass = AsyncMock 2796 elif not _callable(spec): 2797 Klass = NonCallableMagicMock 2798 elif is_type and instance and not _instance_callable(spec): 2799 Klass = NonCallableMagicMock 2800 2801 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 2802 name=_name, **_kwargs) 2803 2804 if isinstance(spec, FunctionTypes): 2805 # should only happen at the top level because we don't 2806 # recurse for functions 2807 if is_async_func: 2808 mock = _set_async_signature(mock, spec) 2809 else: 2810 mock = _set_signature(mock, spec) 2811 else: 2812 _check_signature(spec, mock, is_type, instance) 2813 2814 if _parent is not None and not instance: 2815 _parent._mock_children[_name] = mock 2816 2817 # Pop wraps from kwargs because it must not be passed to configure_mock. 2818 wrapped = kwargs.pop('wraps', None) 2819 if is_type and not instance and 'return_value' not in kwargs: 2820 mock.return_value = create_autospec(spec, spec_set, instance=True, 2821 _name='()', _parent=mock, 2822 wraps=wrapped) 2823 2824 for entry in dir(spec): 2825 if _is_magic(entry): 2826 # MagicMock already does the useful magic methods for us 2827 continue 2828 2829 # XXXX do we need a better way of getting attributes without 2830 # triggering code execution (?) Probably not - we need the actual 2831 # object to mock it so we would rather trigger a property than mock 2832 # the property descriptor. Likewise we want to mock out dynamically 2833 # provided attributes. 2834 # XXXX what about attributes that raise exceptions other than 2835 # AttributeError on being fetched? 2836 # we could be resilient against it, or catch and propagate the 2837 # exception when the attribute is fetched from the mock 2838 try: 2839 original = getattr(spec, entry) 2840 except AttributeError: 2841 continue 2842 2843 child_kwargs = {'spec': original} 2844 # Wrap child attributes also. 2845 if wrapped and hasattr(wrapped, entry): 2846 child_kwargs.update(wraps=original) 2847 if spec_set: 2848 child_kwargs = {'spec_set': original} 2849 2850 if not isinstance(original, FunctionTypes): 2851 new = _SpecState(original, spec_set, mock, entry, instance) 2852 mock._mock_children[entry] = new 2853 else: 2854 parent = mock 2855 if isinstance(spec, FunctionTypes): 2856 parent = mock.mock 2857 2858 skipfirst = _must_skip(spec, entry, is_type) 2859 child_kwargs['_eat_self'] = skipfirst 2860 if iscoroutinefunction(original): 2861 child_klass = AsyncMock 2862 else: 2863 child_klass = MagicMock 2864 new = child_klass(parent=parent, name=entry, _new_name=entry, 2865 _new_parent=parent, **child_kwargs) 2866 mock._mock_children[entry] = new 2867 new.return_value = child_klass() 2868 _check_signature(original, new, skipfirst=skipfirst) 2869 2870 # so functions created with _set_signature become instance attributes, 2871 # *plus* their underlying mock exists in _mock_children of the parent 2872 # mock. Adding to _mock_children may be unnecessary where we are also 2873 # setting as an instance attribute? 2874 if isinstance(new, FunctionTypes): 2875 setattr(mock, entry, new) 2876 # kwargs are passed with respect to the parent mock so, they are not used 2877 # for creating return_value of the parent mock. So, this condition 2878 # should be true only for the parent mock if kwargs are given. 2879 if _is_instance_mock(mock) and kwargs: 2880 mock.configure_mock(**kwargs) 2881 2882 return mock 2883 2884 2885def _must_skip(spec, entry, is_type): 2886 """ 2887 Return whether we should skip the first argument on spec's `entry` 2888 attribute. 2889 """ 2890 if not isinstance(spec, type): 2891 if entry in getattr(spec, '__dict__', {}): 2892 # instance attribute - shouldn't skip 2893 return False 2894 spec = spec.__class__ 2895 2896 for klass in spec.__mro__: 2897 result = klass.__dict__.get(entry, DEFAULT) 2898 if result is DEFAULT: 2899 continue 2900 if isinstance(result, (staticmethod, classmethod)): 2901 return False 2902 elif isinstance(result, FunctionTypes): 2903 # Normal method => skip if looked up on type 2904 # (if looked up on instance, self is already skipped) 2905 return is_type 2906 else: 2907 return False 2908 2909 # function is a dynamically provided attribute 2910 return is_type 2911 2912 2913class _SpecState(object): 2914 2915 def __init__(self, spec, spec_set=False, parent=None, 2916 name=None, ids=None, instance=False): 2917 self.spec = spec 2918 self.ids = ids 2919 self.spec_set = spec_set 2920 self.parent = parent 2921 self.instance = instance 2922 self.name = name 2923 2924 2925FunctionTypes = ( 2926 # python function 2927 type(create_autospec), 2928 # instance method 2929 type(ANY.__eq__), 2930) 2931 2932 2933file_spec = None 2934open_spec = None 2935 2936 2937def _to_stream(read_data): 2938 if isinstance(read_data, bytes): 2939 return io.BytesIO(read_data) 2940 else: 2941 return io.StringIO(read_data) 2942 2943 2944def mock_open(mock=None, read_data=''): 2945 """ 2946 A helper function to create a mock to replace the use of `open`. It works 2947 for `open` called directly or used as a context manager. 2948 2949 The `mock` argument is the mock object to configure. If `None` (the 2950 default) then a `MagicMock` will be created for you, with the API limited 2951 to methods or attributes available on standard file handles. 2952 2953 `read_data` is a string for the `read`, `readline` and `readlines` of the 2954 file handle to return. This is an empty string by default. 2955 """ 2956 _read_data = _to_stream(read_data) 2957 _state = [_read_data, None] 2958 2959 def _readlines_side_effect(*args, **kwargs): 2960 if handle.readlines.return_value is not None: 2961 return handle.readlines.return_value 2962 return _state[0].readlines(*args, **kwargs) 2963 2964 def _read_side_effect(*args, **kwargs): 2965 if handle.read.return_value is not None: 2966 return handle.read.return_value 2967 return _state[0].read(*args, **kwargs) 2968 2969 def _readline_side_effect(*args, **kwargs): 2970 yield from _iter_side_effect() 2971 while True: 2972 yield _state[0].readline(*args, **kwargs) 2973 2974 def _iter_side_effect(): 2975 if handle.readline.return_value is not None: 2976 while True: 2977 yield handle.readline.return_value 2978 for line in _state[0]: 2979 yield line 2980 2981 def _next_side_effect(): 2982 if handle.readline.return_value is not None: 2983 return handle.readline.return_value 2984 return next(_state[0]) 2985 2986 def _exit_side_effect(exctype, excinst, exctb): 2987 handle.close() 2988 2989 global file_spec 2990 if file_spec is None: 2991 import _io 2992 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 2993 2994 global open_spec 2995 if open_spec is None: 2996 import _io 2997 open_spec = list(set(dir(_io.open))) 2998 if mock is None: 2999 mock = MagicMock(name='open', spec=open_spec) 3000 3001 handle = MagicMock(spec=file_spec) 3002 handle.__enter__.return_value = handle 3003 3004 handle.write.return_value = None 3005 handle.read.return_value = None 3006 handle.readline.return_value = None 3007 handle.readlines.return_value = None 3008 3009 handle.read.side_effect = _read_side_effect 3010 _state[1] = _readline_side_effect() 3011 handle.readline.side_effect = _state[1] 3012 handle.readlines.side_effect = _readlines_side_effect 3013 handle.__iter__.side_effect = _iter_side_effect 3014 handle.__next__.side_effect = _next_side_effect 3015 handle.__exit__.side_effect = _exit_side_effect 3016 3017 def reset_data(*args, **kwargs): 3018 _state[0] = _to_stream(read_data) 3019 if handle.readline.side_effect == _state[1]: 3020 # Only reset the side effect if the user hasn't overridden it. 3021 _state[1] = _readline_side_effect() 3022 handle.readline.side_effect = _state[1] 3023 return DEFAULT 3024 3025 mock.side_effect = reset_data 3026 mock.return_value = handle 3027 return mock 3028 3029 3030class PropertyMock(Mock): 3031 """ 3032 A mock intended to be used as a property, or other descriptor, on a class. 3033 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 3034 a return value when it is fetched. 3035 3036 Fetching a `PropertyMock` instance from an object calls the mock, with 3037 no args. Setting it calls the mock with the value being set. 3038 """ 3039 def _get_child_mock(self, /, **kwargs): 3040 return MagicMock(**kwargs) 3041 3042 def __get__(self, obj, obj_type=None): 3043 return self() 3044 def __set__(self, obj, val): 3045 self(val) 3046 3047 3048_timeout_unset = sentinel.TIMEOUT_UNSET 3049 3050class ThreadingMixin(Base): 3051 3052 DEFAULT_TIMEOUT = None 3053 3054 def _get_child_mock(self, /, **kw): 3055 if isinstance(kw.get("parent"), ThreadingMixin): 3056 kw["timeout"] = kw["parent"]._mock_wait_timeout 3057 elif isinstance(kw.get("_new_parent"), ThreadingMixin): 3058 kw["timeout"] = kw["_new_parent"]._mock_wait_timeout 3059 return super()._get_child_mock(**kw) 3060 3061 def __init__(self, *args, timeout=_timeout_unset, **kwargs): 3062 super().__init__(*args, **kwargs) 3063 if timeout is _timeout_unset: 3064 timeout = self.DEFAULT_TIMEOUT 3065 self.__dict__["_mock_event"] = threading.Event() # Event for any call 3066 self.__dict__["_mock_calls_events"] = [] # Events for each of the calls 3067 self.__dict__["_mock_calls_events_lock"] = threading.Lock() 3068 self.__dict__["_mock_wait_timeout"] = timeout 3069 3070 def reset_mock(self, /, *args, **kwargs): 3071 """ 3072 See :func:`.Mock.reset_mock()` 3073 """ 3074 super().reset_mock(*args, **kwargs) 3075 self.__dict__["_mock_event"] = threading.Event() 3076 self.__dict__["_mock_calls_events"] = [] 3077 3078 def __get_event(self, expected_args, expected_kwargs): 3079 with self._mock_calls_events_lock: 3080 for args, kwargs, event in self._mock_calls_events: 3081 if (args, kwargs) == (expected_args, expected_kwargs): 3082 return event 3083 new_event = threading.Event() 3084 self._mock_calls_events.append((expected_args, expected_kwargs, new_event)) 3085 return new_event 3086 3087 def _mock_call(self, *args, **kwargs): 3088 ret_value = super()._mock_call(*args, **kwargs) 3089 3090 call_event = self.__get_event(args, kwargs) 3091 call_event.set() 3092 3093 self._mock_event.set() 3094 3095 return ret_value 3096 3097 def wait_until_called(self, *, timeout=_timeout_unset): 3098 """Wait until the mock object is called. 3099 3100 `timeout` - time to wait for in seconds, waits forever otherwise. 3101 Defaults to the constructor provided timeout. 3102 Use None to block undefinetively. 3103 """ 3104 if timeout is _timeout_unset: 3105 timeout = self._mock_wait_timeout 3106 if not self._mock_event.wait(timeout=timeout): 3107 msg = (f"{self._mock_name or 'mock'} was not called before" 3108 f" timeout({timeout}).") 3109 raise AssertionError(msg) 3110 3111 def wait_until_any_call_with(self, *args, **kwargs): 3112 """Wait until the mock object is called with given args. 3113 3114 Waits for the timeout in seconds provided in the constructor. 3115 """ 3116 event = self.__get_event(args, kwargs) 3117 if not event.wait(timeout=self._mock_wait_timeout): 3118 expected_string = self._format_mock_call_signature(args, kwargs) 3119 raise AssertionError(f'{expected_string} call not found') 3120 3121 3122class ThreadingMock(ThreadingMixin, MagicMixin, Mock): 3123 """ 3124 A mock that can be used to wait until on calls happening 3125 in a different thread. 3126 3127 The constructor can take a `timeout` argument which 3128 controls the timeout in seconds for all `wait` calls of the mock. 3129 3130 You can change the default timeout of all instances via the 3131 `ThreadingMock.DEFAULT_TIMEOUT` attribute. 3132 3133 If no timeout is set, it will block undefinetively. 3134 """ 3135 pass 3136 3137 3138def seal(mock): 3139 """Disable the automatic generation of child mocks. 3140 3141 Given an input Mock, seals it to ensure no further mocks will be generated 3142 when accessing an attribute that was not already defined. 3143 3144 The operation recursively seals the mock passed in, meaning that 3145 the mock itself, any mocks generated by accessing one of its attributes, 3146 and all assigned mocks without a name or spec will be sealed. 3147 """ 3148 mock._mock_sealed = True 3149 for attr in dir(mock): 3150 try: 3151 m = getattr(mock, attr) 3152 except AttributeError: 3153 continue 3154 if not isinstance(m, NonCallableMock): 3155 continue 3156 if isinstance(m._mock_children.get(attr), _SpecState): 3157 continue 3158 if m._mock_new_parent is mock: 3159 seal(m) 3160 3161 3162class _AsyncIterator: 3163 """ 3164 Wraps an iterator in an asynchronous iterator. 3165 """ 3166 def __init__(self, iterator): 3167 self.iterator = iterator 3168 code_mock = NonCallableMock(spec_set=CodeType) 3169 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 3170 self.__dict__['__code__'] = code_mock 3171 3172 async def __anext__(self): 3173 try: 3174 return next(self.iterator) 3175 except StopIteration: 3176 pass 3177 raise StopAsyncIteration 3178