1# mock.py 2# Test tools for mocking and patching. 3# Maintained by Michael Foord 4# Backport for other versions of Python available from 5# https://pypi.org/project/mock 6 7__all__ = ( 8 'Mock', 9 'MagicMock', 10 'patch', 11 'sentinel', 12 'DEFAULT', 13 'ANY', 14 'call', 15 'create_autospec', 16 'AsyncMock', 17 'FILTER_DIR', 18 'NonCallableMock', 19 'NonCallableMagicMock', 20 'mock_open', 21 'PropertyMock', 22 'seal', 23) 24 25 26import asyncio 27import contextlib 28import io 29import inspect 30import pprint 31import sys 32import builtins 33from asyncio import iscoroutinefunction 34from types import CodeType, ModuleType, MethodType 35from unittest.util import safe_repr 36from functools import wraps, partial 37 38 39class InvalidSpecError(Exception): 40 """Indicates that an invalid value was used as a mock spec.""" 41 42 43_builtins = {name for name in dir(builtins) if not name.startswith('_')} 44 45FILTER_DIR = True 46 47# Workaround for issue #12370 48# Without this, the __class__ properties wouldn't be set correctly 49_safe_super = super 50 51def _is_async_obj(obj): 52 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 53 return False 54 if hasattr(obj, '__func__'): 55 obj = getattr(obj, '__func__') 56 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 57 58 59def _is_async_func(func): 60 if getattr(func, '__code__', None): 61 return iscoroutinefunction(func) 62 else: 63 return False 64 65 66def _is_instance_mock(obj): 67 # can't use isinstance on Mock objects because they override __class__ 68 # The base class for all mocks is NonCallableMock 69 return issubclass(type(obj), NonCallableMock) 70 71 72def _is_exception(obj): 73 return ( 74 isinstance(obj, BaseException) or 75 isinstance(obj, type) and issubclass(obj, BaseException) 76 ) 77 78 79def _extract_mock(obj): 80 # Autospecced functions will return a FunctionType with "mock" attribute 81 # which is the actual mock object that needs to be used. 82 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 83 return obj.mock 84 else: 85 return obj 86 87 88def _get_signature_object(func, as_instance, eat_self): 89 """ 90 Given an arbitrary, possibly callable object, try to create a suitable 91 signature object. 92 Return a (reduced func, signature) tuple, or None. 93 """ 94 if isinstance(func, type) and not as_instance: 95 # If it's a type and should be modelled as a type, use __init__. 96 func = func.__init__ 97 # Skip the `self` argument in __init__ 98 eat_self = True 99 elif not isinstance(func, FunctionTypes): 100 # If we really want to model an instance of the passed type, 101 # __call__ should be looked up, not __init__. 102 try: 103 func = func.__call__ 104 except AttributeError: 105 return None 106 if eat_self: 107 sig_func = partial(func, None) 108 else: 109 sig_func = func 110 try: 111 return func, inspect.signature(sig_func) 112 except ValueError: 113 # Certain callable types are not supported by inspect.signature() 114 return None 115 116 117def _check_signature(func, mock, skipfirst, instance=False): 118 sig = _get_signature_object(func, instance, skipfirst) 119 if sig is None: 120 return 121 func, sig = sig 122 def checksig(self, /, *args, **kwargs): 123 sig.bind(*args, **kwargs) 124 _copy_func_details(func, checksig) 125 type(mock)._mock_check_sig = checksig 126 type(mock).__signature__ = sig 127 128 129def _copy_func_details(func, funcopy): 130 # we explicitly don't copy func.__dict__ into this copy as it would 131 # expose original attributes that should be mocked 132 for attribute in ( 133 '__name__', '__doc__', '__text_signature__', 134 '__module__', '__defaults__', '__kwdefaults__', 135 ): 136 try: 137 setattr(funcopy, attribute, getattr(func, attribute)) 138 except AttributeError: 139 pass 140 141 142def _callable(obj): 143 if isinstance(obj, type): 144 return True 145 if isinstance(obj, (staticmethod, classmethod, MethodType)): 146 return _callable(obj.__func__) 147 if getattr(obj, '__call__', None) is not None: 148 return True 149 return False 150 151 152def _is_list(obj): 153 # checks for list or tuples 154 # XXXX badly named! 155 return type(obj) in (list, tuple) 156 157 158def _instance_callable(obj): 159 """Given an object, return True if the object is callable. 160 For classes, return True if instances would be callable.""" 161 if not isinstance(obj, type): 162 # already an instance 163 return getattr(obj, '__call__', None) is not None 164 165 # *could* be broken by a class overriding __mro__ or __dict__ via 166 # a metaclass 167 for base in (obj,) + obj.__mro__: 168 if base.__dict__.get('__call__') is not None: 169 return True 170 return False 171 172 173def _set_signature(mock, original, instance=False): 174 # creates a function with signature (*args, **kwargs) that delegates to a 175 # mock. It still does signature checking by calling a lambda with the same 176 # signature as the original. 177 178 skipfirst = isinstance(original, type) 179 result = _get_signature_object(original, instance, skipfirst) 180 if result is None: 181 return mock 182 func, sig = result 183 def checksig(*args, **kwargs): 184 sig.bind(*args, **kwargs) 185 _copy_func_details(func, checksig) 186 187 name = original.__name__ 188 if not name.isidentifier(): 189 name = 'funcopy' 190 context = {'_checksig_': checksig, 'mock': mock} 191 src = """def %s(*args, **kwargs): 192 _checksig_(*args, **kwargs) 193 return mock(*args, **kwargs)""" % name 194 exec (src, context) 195 funcopy = context[name] 196 _setup_func(funcopy, mock, sig) 197 return funcopy 198 199 200def _setup_func(funcopy, mock, sig): 201 funcopy.mock = mock 202 203 def assert_called_with(*args, **kwargs): 204 return mock.assert_called_with(*args, **kwargs) 205 def assert_called(*args, **kwargs): 206 return mock.assert_called(*args, **kwargs) 207 def assert_not_called(*args, **kwargs): 208 return mock.assert_not_called(*args, **kwargs) 209 def assert_called_once(*args, **kwargs): 210 return mock.assert_called_once(*args, **kwargs) 211 def assert_called_once_with(*args, **kwargs): 212 return mock.assert_called_once_with(*args, **kwargs) 213 def assert_has_calls(*args, **kwargs): 214 return mock.assert_has_calls(*args, **kwargs) 215 def assert_any_call(*args, **kwargs): 216 return mock.assert_any_call(*args, **kwargs) 217 def reset_mock(): 218 funcopy.method_calls = _CallList() 219 funcopy.mock_calls = _CallList() 220 mock.reset_mock() 221 ret = funcopy.return_value 222 if _is_instance_mock(ret) and not ret is mock: 223 ret.reset_mock() 224 225 funcopy.called = False 226 funcopy.call_count = 0 227 funcopy.call_args = None 228 funcopy.call_args_list = _CallList() 229 funcopy.method_calls = _CallList() 230 funcopy.mock_calls = _CallList() 231 232 funcopy.return_value = mock.return_value 233 funcopy.side_effect = mock.side_effect 234 funcopy._mock_children = mock._mock_children 235 236 funcopy.assert_called_with = assert_called_with 237 funcopy.assert_called_once_with = assert_called_once_with 238 funcopy.assert_has_calls = assert_has_calls 239 funcopy.assert_any_call = assert_any_call 240 funcopy.reset_mock = reset_mock 241 funcopy.assert_called = assert_called 242 funcopy.assert_not_called = assert_not_called 243 funcopy.assert_called_once = assert_called_once 244 funcopy.__signature__ = sig 245 246 mock._mock_delegate = funcopy 247 248 249def _setup_async_mock(mock): 250 mock._is_coroutine = asyncio.coroutines._is_coroutine 251 mock.await_count = 0 252 mock.await_args = None 253 mock.await_args_list = _CallList() 254 255 # Mock is not configured yet so the attributes are set 256 # to a function and then the corresponding mock helper function 257 # is called when the helper is accessed similar to _setup_func. 258 def wrapper(attr, /, *args, **kwargs): 259 return getattr(mock.mock, attr)(*args, **kwargs) 260 261 for attribute in ('assert_awaited', 262 'assert_awaited_once', 263 'assert_awaited_with', 264 'assert_awaited_once_with', 265 'assert_any_await', 266 'assert_has_awaits', 267 'assert_not_awaited'): 268 269 # setattr(mock, attribute, wrapper) causes late binding 270 # hence attribute will always be the last value in the loop 271 # Use partial(wrapper, attribute) to ensure the attribute is bound 272 # correctly. 273 setattr(mock, attribute, partial(wrapper, attribute)) 274 275 276def _is_magic(name): 277 return '__%s__' % name[2:-2] == name 278 279 280class _SentinelObject(object): 281 "A unique, named, sentinel object." 282 def __init__(self, name): 283 self.name = name 284 285 def __repr__(self): 286 return 'sentinel.%s' % self.name 287 288 def __reduce__(self): 289 return 'sentinel.%s' % self.name 290 291 292class _Sentinel(object): 293 """Access attributes to return a named object, usable as a sentinel.""" 294 def __init__(self): 295 self._sentinels = {} 296 297 def __getattr__(self, name): 298 if name == '__bases__': 299 # Without this help(unittest.mock) raises an exception 300 raise AttributeError 301 return self._sentinels.setdefault(name, _SentinelObject(name)) 302 303 def __reduce__(self): 304 return 'sentinel' 305 306 307sentinel = _Sentinel() 308 309DEFAULT = sentinel.DEFAULT 310_missing = sentinel.MISSING 311_deleted = sentinel.DELETED 312 313 314_allowed_names = { 315 'return_value', '_mock_return_value', 'side_effect', 316 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 317 '_mock_name', '_mock_new_name' 318} 319 320 321def _delegating_property(name): 322 _allowed_names.add(name) 323 _the_name = '_mock_' + name 324 def _get(self, name=name, _the_name=_the_name): 325 sig = self._mock_delegate 326 if sig is None: 327 return getattr(self, _the_name) 328 return getattr(sig, name) 329 def _set(self, value, name=name, _the_name=_the_name): 330 sig = self._mock_delegate 331 if sig is None: 332 self.__dict__[_the_name] = value 333 else: 334 setattr(sig, name, value) 335 336 return property(_get, _set) 337 338 339 340class _CallList(list): 341 342 def __contains__(self, value): 343 if not isinstance(value, list): 344 return list.__contains__(self, value) 345 len_value = len(value) 346 len_self = len(self) 347 if len_value > len_self: 348 return False 349 350 for i in range(0, len_self - len_value + 1): 351 sub_list = self[i:i+len_value] 352 if sub_list == value: 353 return True 354 return False 355 356 def __repr__(self): 357 return pprint.pformat(list(self)) 358 359 360def _check_and_set_parent(parent, value, name, new_name): 361 value = _extract_mock(value) 362 363 if not _is_instance_mock(value): 364 return False 365 if ((value._mock_name or value._mock_new_name) or 366 (value._mock_parent is not None) or 367 (value._mock_new_parent is not None)): 368 return False 369 370 _parent = parent 371 while _parent is not None: 372 # setting a mock (value) as a child or return value of itself 373 # should not modify the mock 374 if _parent is value: 375 return False 376 _parent = _parent._mock_new_parent 377 378 if new_name: 379 value._mock_new_parent = parent 380 value._mock_new_name = new_name 381 if name: 382 value._mock_parent = parent 383 value._mock_name = name 384 return True 385 386# Internal class to identify if we wrapped an iterator object or not. 387class _MockIter(object): 388 def __init__(self, obj): 389 self.obj = iter(obj) 390 def __next__(self): 391 return next(self.obj) 392 393class Base(object): 394 _mock_return_value = DEFAULT 395 _mock_side_effect = None 396 def __init__(self, /, *args, **kwargs): 397 pass 398 399 400 401class NonCallableMock(Base): 402 """A non-callable version of `Mock`""" 403 404 def __new__(cls, /, *args, **kw): 405 # every instance has its own class 406 # so we can create magic methods on the 407 # class without stomping on other mocks 408 bases = (cls,) 409 if not issubclass(cls, AsyncMockMixin): 410 # Check if spec is an async object or function 411 bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments 412 spec_arg = bound_args.get('spec_set', bound_args.get('spec')) 413 if spec_arg is not None and _is_async_obj(spec_arg): 414 bases = (AsyncMockMixin, cls) 415 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 416 instance = _safe_super(NonCallableMock, cls).__new__(new) 417 return instance 418 419 420 def __init__( 421 self, spec=None, wraps=None, name=None, spec_set=None, 422 parent=None, _spec_state=None, _new_name='', _new_parent=None, 423 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 424 ): 425 if _new_parent is None: 426 _new_parent = parent 427 428 __dict__ = self.__dict__ 429 __dict__['_mock_parent'] = parent 430 __dict__['_mock_name'] = name 431 __dict__['_mock_new_name'] = _new_name 432 __dict__['_mock_new_parent'] = _new_parent 433 __dict__['_mock_sealed'] = False 434 435 if spec_set is not None: 436 spec = spec_set 437 spec_set = True 438 if _eat_self is None: 439 _eat_self = parent is not None 440 441 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 442 443 __dict__['_mock_children'] = {} 444 __dict__['_mock_wraps'] = wraps 445 __dict__['_mock_delegate'] = None 446 447 __dict__['_mock_called'] = False 448 __dict__['_mock_call_args'] = None 449 __dict__['_mock_call_count'] = 0 450 __dict__['_mock_call_args_list'] = _CallList() 451 __dict__['_mock_mock_calls'] = _CallList() 452 453 __dict__['method_calls'] = _CallList() 454 __dict__['_mock_unsafe'] = unsafe 455 456 if kwargs: 457 self.configure_mock(**kwargs) 458 459 _safe_super(NonCallableMock, self).__init__( 460 spec, wraps, name, spec_set, parent, 461 _spec_state 462 ) 463 464 465 def attach_mock(self, mock, attribute): 466 """ 467 Attach a mock as an attribute of this one, replacing its name and 468 parent. Calls to the attached mock will be recorded in the 469 `method_calls` and `mock_calls` attributes of this one.""" 470 inner_mock = _extract_mock(mock) 471 472 inner_mock._mock_parent = None 473 inner_mock._mock_new_parent = None 474 inner_mock._mock_name = '' 475 inner_mock._mock_new_name = None 476 477 setattr(self, attribute, mock) 478 479 480 def mock_add_spec(self, spec, spec_set=False): 481 """Add a spec to a mock. `spec` can either be an object or a 482 list of strings. Only attributes on the `spec` can be fetched as 483 attributes from the mock. 484 485 If `spec_set` is True then only attributes on the spec can be set.""" 486 self._mock_add_spec(spec, spec_set) 487 488 489 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 490 _eat_self=False): 491 _spec_class = None 492 _spec_signature = None 493 _spec_asyncs = [] 494 495 for attr in dir(spec): 496 if iscoroutinefunction(getattr(spec, attr, None)): 497 _spec_asyncs.append(attr) 498 499 if spec is not None and not _is_list(spec): 500 if isinstance(spec, type): 501 _spec_class = spec 502 else: 503 _spec_class = type(spec) 504 res = _get_signature_object(spec, 505 _spec_as_instance, _eat_self) 506 _spec_signature = res and res[1] 507 508 spec = dir(spec) 509 510 __dict__ = self.__dict__ 511 __dict__['_spec_class'] = _spec_class 512 __dict__['_spec_set'] = spec_set 513 __dict__['_spec_signature'] = _spec_signature 514 __dict__['_mock_methods'] = spec 515 __dict__['_spec_asyncs'] = _spec_asyncs 516 517 def __get_return_value(self): 518 ret = self._mock_return_value 519 if self._mock_delegate is not None: 520 ret = self._mock_delegate.return_value 521 522 if ret is DEFAULT: 523 ret = self._get_child_mock( 524 _new_parent=self, _new_name='()' 525 ) 526 self.return_value = ret 527 return ret 528 529 530 def __set_return_value(self, value): 531 if self._mock_delegate is not None: 532 self._mock_delegate.return_value = value 533 else: 534 self._mock_return_value = value 535 _check_and_set_parent(self, value, None, '()') 536 537 __return_value_doc = "The value to be returned when the mock is called." 538 return_value = property(__get_return_value, __set_return_value, 539 __return_value_doc) 540 541 542 @property 543 def __class__(self): 544 if self._spec_class is None: 545 return type(self) 546 return self._spec_class 547 548 called = _delegating_property('called') 549 call_count = _delegating_property('call_count') 550 call_args = _delegating_property('call_args') 551 call_args_list = _delegating_property('call_args_list') 552 mock_calls = _delegating_property('mock_calls') 553 554 555 def __get_side_effect(self): 556 delegated = self._mock_delegate 557 if delegated is None: 558 return self._mock_side_effect 559 sf = delegated.side_effect 560 if (sf is not None and not callable(sf) 561 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 562 sf = _MockIter(sf) 563 delegated.side_effect = sf 564 return sf 565 566 def __set_side_effect(self, value): 567 value = _try_iter(value) 568 delegated = self._mock_delegate 569 if delegated is None: 570 self._mock_side_effect = value 571 else: 572 delegated.side_effect = value 573 574 side_effect = property(__get_side_effect, __set_side_effect) 575 576 577 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 578 "Restore the mock object to its initial state." 579 if visited is None: 580 visited = [] 581 if id(self) in visited: 582 return 583 visited.append(id(self)) 584 585 self.called = False 586 self.call_args = None 587 self.call_count = 0 588 self.mock_calls = _CallList() 589 self.call_args_list = _CallList() 590 self.method_calls = _CallList() 591 592 if return_value: 593 self._mock_return_value = DEFAULT 594 if side_effect: 595 self._mock_side_effect = None 596 597 for child in self._mock_children.values(): 598 if isinstance(child, _SpecState) or child is _deleted: 599 continue 600 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 601 602 ret = self._mock_return_value 603 if _is_instance_mock(ret) and ret is not self: 604 ret.reset_mock(visited) 605 606 607 def configure_mock(self, /, **kwargs): 608 """Set attributes on the mock through keyword arguments. 609 610 Attributes plus return values and side effects can be set on child 611 mocks using standard dot notation and unpacking a dictionary in the 612 method call: 613 614 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 615 >>> mock.configure_mock(**attrs)""" 616 for arg, val in sorted(kwargs.items(), 617 # we sort on the number of dots so that 618 # attributes are set before we set attributes on 619 # attributes 620 key=lambda entry: entry[0].count('.')): 621 args = arg.split('.') 622 final = args.pop() 623 obj = self 624 for entry in args: 625 obj = getattr(obj, entry) 626 setattr(obj, final, val) 627 628 629 def __getattr__(self, name): 630 if name in {'_mock_methods', '_mock_unsafe'}: 631 raise AttributeError(name) 632 elif self._mock_methods is not None: 633 if name not in self._mock_methods or name in _all_magics: 634 raise AttributeError("Mock object has no attribute %r" % name) 635 elif _is_magic(name): 636 raise AttributeError(name) 637 if not self._mock_unsafe: 638 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')): 639 raise AttributeError( 640 f"{name!r} is not a valid assertion. Use a spec " 641 f"for the mock if {name!r} is meant to be an attribute.") 642 643 result = self._mock_children.get(name) 644 if result is _deleted: 645 raise AttributeError(name) 646 elif result is None: 647 wraps = None 648 if self._mock_wraps is not None: 649 # XXXX should we get the attribute without triggering code 650 # execution? 651 wraps = getattr(self._mock_wraps, name) 652 653 result = self._get_child_mock( 654 parent=self, name=name, wraps=wraps, _new_name=name, 655 _new_parent=self 656 ) 657 self._mock_children[name] = result 658 659 elif isinstance(result, _SpecState): 660 try: 661 result = create_autospec( 662 result.spec, result.spec_set, result.instance, 663 result.parent, result.name 664 ) 665 except InvalidSpecError: 666 target_name = self.__dict__['_mock_name'] or self 667 raise InvalidSpecError( 668 f'Cannot autospec attr {name!r} from target ' 669 f'{target_name!r} as it has already been mocked out. ' 670 f'[target={self!r}, attr={result.spec!r}]') 671 self._mock_children[name] = result 672 673 return result 674 675 676 def _extract_mock_name(self): 677 _name_list = [self._mock_new_name] 678 _parent = self._mock_new_parent 679 last = self 680 681 dot = '.' 682 if _name_list == ['()']: 683 dot = '' 684 685 while _parent is not None: 686 last = _parent 687 688 _name_list.append(_parent._mock_new_name + dot) 689 dot = '.' 690 if _parent._mock_new_name == '()': 691 dot = '' 692 693 _parent = _parent._mock_new_parent 694 695 _name_list = list(reversed(_name_list)) 696 _first = last._mock_name or 'mock' 697 if len(_name_list) > 1: 698 if _name_list[1] not in ('()', '().'): 699 _first += '.' 700 _name_list[0] = _first 701 return ''.join(_name_list) 702 703 def __repr__(self): 704 name = self._extract_mock_name() 705 706 name_string = '' 707 if name not in ('mock', 'mock.'): 708 name_string = ' name=%r' % name 709 710 spec_string = '' 711 if self._spec_class is not None: 712 spec_string = ' spec=%r' 713 if self._spec_set: 714 spec_string = ' spec_set=%r' 715 spec_string = spec_string % self._spec_class.__name__ 716 return "<%s%s%s id='%s'>" % ( 717 type(self).__name__, 718 name_string, 719 spec_string, 720 id(self) 721 ) 722 723 724 def __dir__(self): 725 """Filter the output of `dir(mock)` to only useful members.""" 726 if not FILTER_DIR: 727 return object.__dir__(self) 728 729 extras = self._mock_methods or [] 730 from_type = dir(type(self)) 731 from_dict = list(self.__dict__) 732 from_child_mocks = [ 733 m_name for m_name, m_value in self._mock_children.items() 734 if m_value is not _deleted] 735 736 from_type = [e for e in from_type if not e.startswith('_')] 737 from_dict = [e for e in from_dict if not e.startswith('_') or 738 _is_magic(e)] 739 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 740 741 742 def __setattr__(self, name, value): 743 if name in _allowed_names: 744 # property setters go through here 745 return object.__setattr__(self, name, value) 746 elif (self._spec_set and self._mock_methods is not None and 747 name not in self._mock_methods and 748 name not in self.__dict__): 749 raise AttributeError("Mock object has no attribute '%s'" % name) 750 elif name in _unsupported_magics: 751 msg = 'Attempting to set unsupported magic method %r.' % name 752 raise AttributeError(msg) 753 elif name in _all_magics: 754 if self._mock_methods is not None and name not in self._mock_methods: 755 raise AttributeError("Mock object has no attribute '%s'" % name) 756 757 if not _is_instance_mock(value): 758 setattr(type(self), name, _get_method(name, value)) 759 original = value 760 value = lambda *args, **kw: original(self, *args, **kw) 761 else: 762 # only set _new_name and not name so that mock_calls is tracked 763 # but not method calls 764 _check_and_set_parent(self, value, None, name) 765 setattr(type(self), name, value) 766 self._mock_children[name] = value 767 elif name == '__class__': 768 self._spec_class = value 769 return 770 else: 771 if _check_and_set_parent(self, value, name, name): 772 self._mock_children[name] = value 773 774 if self._mock_sealed and not hasattr(self, name): 775 mock_name = f'{self._extract_mock_name()}.{name}' 776 raise AttributeError(f'Cannot set {mock_name}') 777 778 return object.__setattr__(self, name, value) 779 780 781 def __delattr__(self, name): 782 if name in _all_magics and name in type(self).__dict__: 783 delattr(type(self), name) 784 if name not in self.__dict__: 785 # for magic methods that are still MagicProxy objects and 786 # not set on the instance itself 787 return 788 789 obj = self._mock_children.get(name, _missing) 790 if name in self.__dict__: 791 _safe_super(NonCallableMock, self).__delattr__(name) 792 elif obj is _deleted: 793 raise AttributeError(name) 794 if obj is not _missing: 795 del self._mock_children[name] 796 self._mock_children[name] = _deleted 797 798 799 def _format_mock_call_signature(self, args, kwargs): 800 name = self._mock_name or 'mock' 801 return _format_call_signature(name, args, kwargs) 802 803 804 def _format_mock_failure_message(self, args, kwargs, action='call'): 805 message = 'expected %s not found.\nExpected: %s\nActual: %s' 806 expected_string = self._format_mock_call_signature(args, kwargs) 807 call_args = self.call_args 808 actual_string = self._format_mock_call_signature(*call_args) 809 return message % (action, expected_string, actual_string) 810 811 812 def _get_call_signature_from_name(self, name): 813 """ 814 * If call objects are asserted against a method/function like obj.meth1 815 then there could be no name for the call object to lookup. Hence just 816 return the spec_signature of the method/function being asserted against. 817 * If the name is not empty then remove () and split by '.' to get 818 list of names to iterate through the children until a potential 819 match is found. A child mock is created only during attribute access 820 so if we get a _SpecState then no attributes of the spec were accessed 821 and can be safely exited. 822 """ 823 if not name: 824 return self._spec_signature 825 826 sig = None 827 names = name.replace('()', '').split('.') 828 children = self._mock_children 829 830 for name in names: 831 child = children.get(name) 832 if child is None or isinstance(child, _SpecState): 833 break 834 else: 835 # If an autospecced object is attached using attach_mock the 836 # child would be a function with mock object as attribute from 837 # which signature has to be derived. 838 child = _extract_mock(child) 839 children = child._mock_children 840 sig = child._spec_signature 841 842 return sig 843 844 845 def _call_matcher(self, _call): 846 """ 847 Given a call (or simply an (args, kwargs) tuple), return a 848 comparison key suitable for matching with other calls. 849 This is a best effort method which relies on the spec's signature, 850 if available, or falls back on the arguments themselves. 851 """ 852 853 if isinstance(_call, tuple) and len(_call) > 2: 854 sig = self._get_call_signature_from_name(_call[0]) 855 else: 856 sig = self._spec_signature 857 858 if sig is not None: 859 if len(_call) == 2: 860 name = '' 861 args, kwargs = _call 862 else: 863 name, args, kwargs = _call 864 try: 865 bound_call = sig.bind(*args, **kwargs) 866 return call(name, bound_call.args, bound_call.kwargs) 867 except TypeError as e: 868 return e.with_traceback(None) 869 else: 870 return _call 871 872 def assert_not_called(self): 873 """assert that the mock was never called. 874 """ 875 if self.call_count != 0: 876 msg = ("Expected '%s' to not have been called. Called %s times.%s" 877 % (self._mock_name or 'mock', 878 self.call_count, 879 self._calls_repr())) 880 raise AssertionError(msg) 881 882 def assert_called(self): 883 """assert that the mock was called at least once 884 """ 885 if self.call_count == 0: 886 msg = ("Expected '%s' to have been called." % 887 (self._mock_name or 'mock')) 888 raise AssertionError(msg) 889 890 def assert_called_once(self): 891 """assert that the mock was called only once. 892 """ 893 if not self.call_count == 1: 894 msg = ("Expected '%s' to have been called once. Called %s times.%s" 895 % (self._mock_name or 'mock', 896 self.call_count, 897 self._calls_repr())) 898 raise AssertionError(msg) 899 900 def assert_called_with(self, /, *args, **kwargs): 901 """assert that the last call was made with the specified arguments. 902 903 Raises an AssertionError if the args and keyword args passed in are 904 different to the last call to the mock.""" 905 if self.call_args is None: 906 expected = self._format_mock_call_signature(args, kwargs) 907 actual = 'not called.' 908 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 909 % (expected, actual)) 910 raise AssertionError(error_message) 911 912 def _error_message(): 913 msg = self._format_mock_failure_message(args, kwargs) 914 return msg 915 expected = self._call_matcher(_Call((args, kwargs), two=True)) 916 actual = self._call_matcher(self.call_args) 917 if actual != expected: 918 cause = expected if isinstance(expected, Exception) else None 919 raise AssertionError(_error_message()) from cause 920 921 922 def assert_called_once_with(self, /, *args, **kwargs): 923 """assert that the mock was called exactly once and that that call was 924 with the specified arguments.""" 925 if not self.call_count == 1: 926 msg = ("Expected '%s' to be called once. Called %s times.%s" 927 % (self._mock_name or 'mock', 928 self.call_count, 929 self._calls_repr())) 930 raise AssertionError(msg) 931 return self.assert_called_with(*args, **kwargs) 932 933 934 def assert_has_calls(self, calls, any_order=False): 935 """assert the mock has been called with the specified calls. 936 The `mock_calls` list is checked for the calls. 937 938 If `any_order` is False (the default) then the calls must be 939 sequential. There can be extra calls before or after the 940 specified calls. 941 942 If `any_order` is True then the calls can be in any order, but 943 they must all appear in `mock_calls`.""" 944 expected = [self._call_matcher(c) for c in calls] 945 cause = next((e for e in expected if isinstance(e, Exception)), None) 946 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 947 if not any_order: 948 if expected not in all_calls: 949 if cause is None: 950 problem = 'Calls not found.' 951 else: 952 problem = ('Error processing expected calls.\n' 953 'Errors: {}').format( 954 [e if isinstance(e, Exception) else None 955 for e in expected]) 956 raise AssertionError( 957 f'{problem}\n' 958 f'Expected: {_CallList(calls)}' 959 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 960 ) from cause 961 return 962 963 all_calls = list(all_calls) 964 965 not_found = [] 966 for kall in expected: 967 try: 968 all_calls.remove(kall) 969 except ValueError: 970 not_found.append(kall) 971 if not_found: 972 raise AssertionError( 973 '%r does not contain all of %r in its call list, ' 974 'found %r instead' % (self._mock_name or 'mock', 975 tuple(not_found), all_calls) 976 ) from cause 977 978 979 def assert_any_call(self, /, *args, **kwargs): 980 """assert the mock has been called with the specified arguments. 981 982 The assert passes if the mock has *ever* been called, unlike 983 `assert_called_with` and `assert_called_once_with` that only pass if 984 the call is the most recent one.""" 985 expected = self._call_matcher(_Call((args, kwargs), two=True)) 986 cause = expected if isinstance(expected, Exception) else None 987 actual = [self._call_matcher(c) for c in self.call_args_list] 988 if cause or expected not in _AnyComparer(actual): 989 expected_string = self._format_mock_call_signature(args, kwargs) 990 raise AssertionError( 991 '%s call not found' % expected_string 992 ) from cause 993 994 995 def _get_child_mock(self, /, **kw): 996 """Create the child mocks for attributes and return value. 997 By default child mocks will be the same type as the parent. 998 Subclasses of Mock may want to override this to customize the way 999 child mocks are made. 1000 1001 For non-callable mocks the callable variant will be used (rather than 1002 any custom subclass).""" 1003 _new_name = kw.get("_new_name") 1004 if _new_name in self.__dict__['_spec_asyncs']: 1005 return AsyncMock(**kw) 1006 1007 if self._mock_sealed: 1008 attribute = f".{kw['name']}" if "name" in kw else "()" 1009 mock_name = self._extract_mock_name() + attribute 1010 raise AttributeError(mock_name) 1011 1012 _type = type(self) 1013 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 1014 # Any asynchronous magic becomes an AsyncMock 1015 klass = AsyncMock 1016 elif issubclass(_type, AsyncMockMixin): 1017 if (_new_name in _all_sync_magics or 1018 self._mock_methods and _new_name in self._mock_methods): 1019 # Any synchronous method on AsyncMock becomes a MagicMock 1020 klass = MagicMock 1021 else: 1022 klass = AsyncMock 1023 elif not issubclass(_type, CallableMixin): 1024 if issubclass(_type, NonCallableMagicMock): 1025 klass = MagicMock 1026 elif issubclass(_type, NonCallableMock): 1027 klass = Mock 1028 else: 1029 klass = _type.__mro__[1] 1030 return klass(**kw) 1031 1032 1033 def _calls_repr(self, prefix="Calls"): 1034 """Renders self.mock_calls as a string. 1035 1036 Example: "\nCalls: [call(1), call(2)]." 1037 1038 If self.mock_calls is empty, an empty string is returned. The 1039 output will be truncated if very long. 1040 """ 1041 if not self.mock_calls: 1042 return "" 1043 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 1044 1045 1046_MOCK_SIG = inspect.signature(NonCallableMock.__init__) 1047 1048 1049class _AnyComparer(list): 1050 """A list which checks if it contains a call which may have an 1051 argument of ANY, flipping the components of item and self from 1052 their traditional locations so that ANY is guaranteed to be on 1053 the left.""" 1054 def __contains__(self, item): 1055 for _call in self: 1056 assert len(item) == len(_call) 1057 if all([ 1058 expected == actual 1059 for expected, actual in zip(item, _call) 1060 ]): 1061 return True 1062 return False 1063 1064 1065def _try_iter(obj): 1066 if obj is None: 1067 return obj 1068 if _is_exception(obj): 1069 return obj 1070 if _callable(obj): 1071 return obj 1072 try: 1073 return iter(obj) 1074 except TypeError: 1075 # XXXX backwards compatibility 1076 # but this will blow up on first call - so maybe we should fail early? 1077 return obj 1078 1079 1080class CallableMixin(Base): 1081 1082 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 1083 wraps=None, name=None, spec_set=None, parent=None, 1084 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 1085 self.__dict__['_mock_return_value'] = return_value 1086 _safe_super(CallableMixin, self).__init__( 1087 spec, wraps, name, spec_set, parent, 1088 _spec_state, _new_name, _new_parent, **kwargs 1089 ) 1090 1091 self.side_effect = side_effect 1092 1093 1094 def _mock_check_sig(self, /, *args, **kwargs): 1095 # stub method that can be replaced with one with a specific signature 1096 pass 1097 1098 1099 def __call__(self, /, *args, **kwargs): 1100 # can't use self in-case a function / method we are mocking uses self 1101 # in the signature 1102 self._mock_check_sig(*args, **kwargs) 1103 self._increment_mock_call(*args, **kwargs) 1104 return self._mock_call(*args, **kwargs) 1105 1106 1107 def _mock_call(self, /, *args, **kwargs): 1108 return self._execute_mock_call(*args, **kwargs) 1109 1110 def _increment_mock_call(self, /, *args, **kwargs): 1111 self.called = True 1112 self.call_count += 1 1113 1114 # handle call_args 1115 # needs to be set here so assertions on call arguments pass before 1116 # execution in the case of awaited calls 1117 _call = _Call((args, kwargs), two=True) 1118 self.call_args = _call 1119 self.call_args_list.append(_call) 1120 1121 # initial stuff for method_calls: 1122 do_method_calls = self._mock_parent is not None 1123 method_call_name = self._mock_name 1124 1125 # initial stuff for mock_calls: 1126 mock_call_name = self._mock_new_name 1127 is_a_call = mock_call_name == '()' 1128 self.mock_calls.append(_Call(('', args, kwargs))) 1129 1130 # follow up the chain of mocks: 1131 _new_parent = self._mock_new_parent 1132 while _new_parent is not None: 1133 1134 # handle method_calls: 1135 if do_method_calls: 1136 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 1137 do_method_calls = _new_parent._mock_parent is not None 1138 if do_method_calls: 1139 method_call_name = _new_parent._mock_name + '.' + method_call_name 1140 1141 # handle mock_calls: 1142 this_mock_call = _Call((mock_call_name, args, kwargs)) 1143 _new_parent.mock_calls.append(this_mock_call) 1144 1145 if _new_parent._mock_new_name: 1146 if is_a_call: 1147 dot = '' 1148 else: 1149 dot = '.' 1150 is_a_call = _new_parent._mock_new_name == '()' 1151 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 1152 1153 # follow the parental chain: 1154 _new_parent = _new_parent._mock_new_parent 1155 1156 def _execute_mock_call(self, /, *args, **kwargs): 1157 # separate from _increment_mock_call so that awaited functions are 1158 # executed separately from their call, also AsyncMock overrides this method 1159 1160 effect = self.side_effect 1161 if effect is not None: 1162 if _is_exception(effect): 1163 raise effect 1164 elif not _callable(effect): 1165 result = next(effect) 1166 if _is_exception(result): 1167 raise result 1168 else: 1169 result = effect(*args, **kwargs) 1170 1171 if result is not DEFAULT: 1172 return result 1173 1174 if self._mock_return_value is not DEFAULT: 1175 return self.return_value 1176 1177 if self._mock_wraps is not None: 1178 return self._mock_wraps(*args, **kwargs) 1179 1180 return self.return_value 1181 1182 1183 1184class Mock(CallableMixin, NonCallableMock): 1185 """ 1186 Create a new `Mock` object. `Mock` takes several optional arguments 1187 that specify the behaviour of the Mock object: 1188 1189 * `spec`: This can be either a list of strings or an existing object (a 1190 class or instance) that acts as the specification for the mock object. If 1191 you pass in an object then a list of strings is formed by calling dir on 1192 the object (excluding unsupported magic attributes and methods). Accessing 1193 any attribute not in this list will raise an `AttributeError`. 1194 1195 If `spec` is an object (rather than a list of strings) then 1196 `mock.__class__` returns the class of the spec object. This allows mocks 1197 to pass `isinstance` tests. 1198 1199 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 1200 or get an attribute on the mock that isn't on the object passed as 1201 `spec_set` will raise an `AttributeError`. 1202 1203 * `side_effect`: A function to be called whenever the Mock is called. See 1204 the `side_effect` attribute. Useful for raising exceptions or 1205 dynamically changing return values. The function is called with the same 1206 arguments as the mock, and unless it returns `DEFAULT`, the return 1207 value of this function is used as the return value. 1208 1209 If `side_effect` is an iterable then each call to the mock will return 1210 the next value from the iterable. If any of the members of the iterable 1211 are exceptions they will be raised instead of returned. 1212 1213 * `return_value`: The value returned when the mock is called. By default 1214 this is a new Mock (created on first access). See the 1215 `return_value` attribute. 1216 1217 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 1218 calling the Mock will pass the call through to the wrapped object 1219 (returning the real result). Attribute access on the mock will return a 1220 Mock object that wraps the corresponding attribute of the wrapped object 1221 (so attempting to access an attribute that doesn't exist will raise an 1222 `AttributeError`). 1223 1224 If the mock has an explicit `return_value` set then calls are not passed 1225 to the wrapped object and the `return_value` is returned instead. 1226 1227 * `name`: If the mock has a name then it will be used in the repr of the 1228 mock. This can be useful for debugging. The name is propagated to child 1229 mocks. 1230 1231 Mocks can also be called with arbitrary keyword arguments. These will be 1232 used to set attributes on the mock after it is created. 1233 """ 1234 1235 1236def _dot_lookup(thing, comp, import_path): 1237 try: 1238 return getattr(thing, comp) 1239 except AttributeError: 1240 __import__(import_path) 1241 return getattr(thing, comp) 1242 1243 1244def _importer(target): 1245 components = target.split('.') 1246 import_path = components.pop(0) 1247 thing = __import__(import_path) 1248 1249 for comp in components: 1250 import_path += ".%s" % comp 1251 thing = _dot_lookup(thing, comp, import_path) 1252 return thing 1253 1254 1255# _check_spec_arg_typos takes kwargs from commands like patch and checks that 1256# they don't contain common misspellings of arguments related to autospeccing. 1257def _check_spec_arg_typos(kwargs_to_check): 1258 typos = ("autospect", "auto_spec", "set_spec") 1259 for typo in typos: 1260 if typo in kwargs_to_check: 1261 raise RuntimeError( 1262 f"{typo!r} might be a typo; use unsafe=True if this is intended" 1263 ) 1264 1265 1266class _patch(object): 1267 1268 attribute_name = None 1269 _active_patches = [] 1270 1271 def __init__( 1272 self, getter, attribute, new, spec, create, 1273 spec_set, autospec, new_callable, kwargs, *, unsafe=False 1274 ): 1275 if new_callable is not None: 1276 if new is not DEFAULT: 1277 raise ValueError( 1278 "Cannot use 'new' and 'new_callable' together" 1279 ) 1280 if autospec is not None: 1281 raise ValueError( 1282 "Cannot use 'autospec' and 'new_callable' together" 1283 ) 1284 if not unsafe: 1285 _check_spec_arg_typos(kwargs) 1286 if _is_instance_mock(spec): 1287 raise InvalidSpecError( 1288 f'Cannot spec attr {attribute!r} as the spec ' 1289 f'has already been mocked out. [spec={spec!r}]') 1290 if _is_instance_mock(spec_set): 1291 raise InvalidSpecError( 1292 f'Cannot spec attr {attribute!r} as the spec_set ' 1293 f'target has already been mocked out. [spec_set={spec_set!r}]') 1294 1295 self.getter = getter 1296 self.attribute = attribute 1297 self.new = new 1298 self.new_callable = new_callable 1299 self.spec = spec 1300 self.create = create 1301 self.has_local = False 1302 self.spec_set = spec_set 1303 self.autospec = autospec 1304 self.kwargs = kwargs 1305 self.additional_patchers = [] 1306 1307 1308 def copy(self): 1309 patcher = _patch( 1310 self.getter, self.attribute, self.new, self.spec, 1311 self.create, self.spec_set, 1312 self.autospec, self.new_callable, self.kwargs 1313 ) 1314 patcher.attribute_name = self.attribute_name 1315 patcher.additional_patchers = [ 1316 p.copy() for p in self.additional_patchers 1317 ] 1318 return patcher 1319 1320 1321 def __call__(self, func): 1322 if isinstance(func, type): 1323 return self.decorate_class(func) 1324 if inspect.iscoroutinefunction(func): 1325 return self.decorate_async_callable(func) 1326 return self.decorate_callable(func) 1327 1328 1329 def decorate_class(self, klass): 1330 for attr in dir(klass): 1331 if not attr.startswith(patch.TEST_PREFIX): 1332 continue 1333 1334 attr_value = getattr(klass, attr) 1335 if not hasattr(attr_value, "__call__"): 1336 continue 1337 1338 patcher = self.copy() 1339 setattr(klass, attr, patcher(attr_value)) 1340 return klass 1341 1342 1343 @contextlib.contextmanager 1344 def decoration_helper(self, patched, args, keywargs): 1345 extra_args = [] 1346 with contextlib.ExitStack() as exit_stack: 1347 for patching in patched.patchings: 1348 arg = exit_stack.enter_context(patching) 1349 if patching.attribute_name is not None: 1350 keywargs.update(arg) 1351 elif patching.new is DEFAULT: 1352 extra_args.append(arg) 1353 1354 args += tuple(extra_args) 1355 yield (args, keywargs) 1356 1357 1358 def decorate_callable(self, func): 1359 # NB. Keep the method in sync with decorate_async_callable() 1360 if hasattr(func, 'patchings'): 1361 func.patchings.append(self) 1362 return func 1363 1364 @wraps(func) 1365 def patched(*args, **keywargs): 1366 with self.decoration_helper(patched, 1367 args, 1368 keywargs) as (newargs, newkeywargs): 1369 return func(*newargs, **newkeywargs) 1370 1371 patched.patchings = [self] 1372 return patched 1373 1374 1375 def decorate_async_callable(self, func): 1376 # NB. Keep the method in sync with decorate_callable() 1377 if hasattr(func, 'patchings'): 1378 func.patchings.append(self) 1379 return func 1380 1381 @wraps(func) 1382 async def patched(*args, **keywargs): 1383 with self.decoration_helper(patched, 1384 args, 1385 keywargs) as (newargs, newkeywargs): 1386 return await func(*newargs, **newkeywargs) 1387 1388 patched.patchings = [self] 1389 return patched 1390 1391 1392 def get_original(self): 1393 target = self.getter() 1394 name = self.attribute 1395 1396 original = DEFAULT 1397 local = False 1398 1399 try: 1400 original = target.__dict__[name] 1401 except (AttributeError, KeyError): 1402 original = getattr(target, name, DEFAULT) 1403 else: 1404 local = True 1405 1406 if name in _builtins and isinstance(target, ModuleType): 1407 self.create = True 1408 1409 if not self.create and original is DEFAULT: 1410 raise AttributeError( 1411 "%s does not have the attribute %r" % (target, name) 1412 ) 1413 return original, local 1414 1415 1416 def __enter__(self): 1417 """Perform the patch.""" 1418 new, spec, spec_set = self.new, self.spec, self.spec_set 1419 autospec, kwargs = self.autospec, self.kwargs 1420 new_callable = self.new_callable 1421 self.target = self.getter() 1422 1423 # normalise False to None 1424 if spec is False: 1425 spec = None 1426 if spec_set is False: 1427 spec_set = None 1428 if autospec is False: 1429 autospec = None 1430 1431 if spec is not None and autospec is not None: 1432 raise TypeError("Can't specify spec and autospec") 1433 if ((spec is not None or autospec is not None) and 1434 spec_set not in (True, None)): 1435 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 1436 1437 original, local = self.get_original() 1438 1439 if new is DEFAULT and autospec is None: 1440 inherit = False 1441 if spec is True: 1442 # set spec to the object we are replacing 1443 spec = original 1444 if spec_set is True: 1445 spec_set = original 1446 spec = None 1447 elif spec is not None: 1448 if spec_set is True: 1449 spec_set = spec 1450 spec = None 1451 elif spec_set is True: 1452 spec_set = original 1453 1454 if spec is not None or spec_set is not None: 1455 if original is DEFAULT: 1456 raise TypeError("Can't use 'spec' with create=True") 1457 if isinstance(original, type): 1458 # If we're patching out a class and there is a spec 1459 inherit = True 1460 if spec is None and _is_async_obj(original): 1461 Klass = AsyncMock 1462 else: 1463 Klass = MagicMock 1464 _kwargs = {} 1465 if new_callable is not None: 1466 Klass = new_callable 1467 elif spec is not None or spec_set is not None: 1468 this_spec = spec 1469 if spec_set is not None: 1470 this_spec = spec_set 1471 if _is_list(this_spec): 1472 not_callable = '__call__' not in this_spec 1473 else: 1474 not_callable = not callable(this_spec) 1475 if _is_async_obj(this_spec): 1476 Klass = AsyncMock 1477 elif not_callable: 1478 Klass = NonCallableMagicMock 1479 1480 if spec is not None: 1481 _kwargs['spec'] = spec 1482 if spec_set is not None: 1483 _kwargs['spec_set'] = spec_set 1484 1485 # add a name to mocks 1486 if (isinstance(Klass, type) and 1487 issubclass(Klass, NonCallableMock) and self.attribute): 1488 _kwargs['name'] = self.attribute 1489 1490 _kwargs.update(kwargs) 1491 new = Klass(**_kwargs) 1492 1493 if inherit and _is_instance_mock(new): 1494 # we can only tell if the instance should be callable if the 1495 # spec is not a list 1496 this_spec = spec 1497 if spec_set is not None: 1498 this_spec = spec_set 1499 if (not _is_list(this_spec) and not 1500 _instance_callable(this_spec)): 1501 Klass = NonCallableMagicMock 1502 1503 _kwargs.pop('name') 1504 new.return_value = Klass(_new_parent=new, _new_name='()', 1505 **_kwargs) 1506 elif autospec is not None: 1507 # spec is ignored, new *must* be default, spec_set is treated 1508 # as a boolean. Should we check spec is not None and that spec_set 1509 # is a bool? 1510 if new is not DEFAULT: 1511 raise TypeError( 1512 "autospec creates the mock for you. Can't specify " 1513 "autospec and new." 1514 ) 1515 if original is DEFAULT: 1516 raise TypeError("Can't use 'autospec' with create=True") 1517 spec_set = bool(spec_set) 1518 if autospec is True: 1519 autospec = original 1520 1521 if _is_instance_mock(self.target): 1522 raise InvalidSpecError( 1523 f'Cannot autospec attr {self.attribute!r} as the patch ' 1524 f'target has already been mocked out. ' 1525 f'[target={self.target!r}, attr={autospec!r}]') 1526 if _is_instance_mock(autospec): 1527 target_name = getattr(self.target, '__name__', self.target) 1528 raise InvalidSpecError( 1529 f'Cannot autospec attr {self.attribute!r} from target ' 1530 f'{target_name!r} as it has already been mocked out. ' 1531 f'[target={self.target!r}, attr={autospec!r}]') 1532 1533 new = create_autospec(autospec, spec_set=spec_set, 1534 _name=self.attribute, **kwargs) 1535 elif kwargs: 1536 # can't set keyword args when we aren't creating the mock 1537 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 1538 raise TypeError("Can't pass kwargs to a mock we aren't creating") 1539 1540 new_attr = new 1541 1542 self.temp_original = original 1543 self.is_local = local 1544 self._exit_stack = contextlib.ExitStack() 1545 try: 1546 setattr(self.target, self.attribute, new_attr) 1547 if self.attribute_name is not None: 1548 extra_args = {} 1549 if self.new is DEFAULT: 1550 extra_args[self.attribute_name] = new 1551 for patching in self.additional_patchers: 1552 arg = self._exit_stack.enter_context(patching) 1553 if patching.new is DEFAULT: 1554 extra_args.update(arg) 1555 return extra_args 1556 1557 return new 1558 except: 1559 if not self.__exit__(*sys.exc_info()): 1560 raise 1561 1562 def __exit__(self, *exc_info): 1563 """Undo the patch.""" 1564 if self.is_local and self.temp_original is not DEFAULT: 1565 setattr(self.target, self.attribute, self.temp_original) 1566 else: 1567 delattr(self.target, self.attribute) 1568 if not self.create and (not hasattr(self.target, self.attribute) or 1569 self.attribute in ('__doc__', '__module__', 1570 '__defaults__', '__annotations__', 1571 '__kwdefaults__')): 1572 # needed for proxy objects like django settings 1573 setattr(self.target, self.attribute, self.temp_original) 1574 1575 del self.temp_original 1576 del self.is_local 1577 del self.target 1578 exit_stack = self._exit_stack 1579 del self._exit_stack 1580 return exit_stack.__exit__(*exc_info) 1581 1582 1583 def start(self): 1584 """Activate a patch, returning any created mock.""" 1585 result = self.__enter__() 1586 self._active_patches.append(self) 1587 return result 1588 1589 1590 def stop(self): 1591 """Stop an active patch.""" 1592 try: 1593 self._active_patches.remove(self) 1594 except ValueError: 1595 # If the patch hasn't been started this will fail 1596 return None 1597 1598 return self.__exit__(None, None, None) 1599 1600 1601 1602def _get_target(target): 1603 try: 1604 target, attribute = target.rsplit('.', 1) 1605 except (TypeError, ValueError): 1606 raise TypeError("Need a valid target to patch. You supplied: %r" % 1607 (target,)) 1608 getter = lambda: _importer(target) 1609 return getter, attribute 1610 1611 1612def _patch_object( 1613 target, attribute, new=DEFAULT, spec=None, 1614 create=False, spec_set=None, autospec=None, 1615 new_callable=None, *, unsafe=False, **kwargs 1616 ): 1617 """ 1618 patch the named member (`attribute`) on an object (`target`) with a mock 1619 object. 1620 1621 `patch.object` can be used as a decorator, class decorator or a context 1622 manager. Arguments `new`, `spec`, `create`, `spec_set`, 1623 `autospec` and `new_callable` have the same meaning as for `patch`. Like 1624 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 1625 the mock object it creates. 1626 1627 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 1628 for choosing which methods to wrap. 1629 """ 1630 if type(target) is str: 1631 raise TypeError( 1632 f"{target!r} must be the actual object to be patched, not a str" 1633 ) 1634 getter = lambda: target 1635 return _patch( 1636 getter, attribute, new, spec, create, 1637 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 1638 ) 1639 1640 1641def _patch_multiple(target, spec=None, create=False, spec_set=None, 1642 autospec=None, new_callable=None, **kwargs): 1643 """Perform multiple patches in a single call. It takes the object to be 1644 patched (either as an object or a string to fetch the object by importing) 1645 and keyword arguments for the patches:: 1646 1647 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 1648 ... 1649 1650 Use `DEFAULT` as the value if you want `patch.multiple` to create 1651 mocks for you. In this case the created mocks are passed into a decorated 1652 function by keyword, and a dictionary is returned when `patch.multiple` is 1653 used as a context manager. 1654 1655 `patch.multiple` can be used as a decorator, class decorator or a context 1656 manager. The arguments `spec`, `spec_set`, `create`, 1657 `autospec` and `new_callable` have the same meaning as for `patch`. These 1658 arguments will be applied to *all* patches done by `patch.multiple`. 1659 1660 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 1661 for choosing which methods to wrap. 1662 """ 1663 if type(target) is str: 1664 getter = lambda: _importer(target) 1665 else: 1666 getter = lambda: target 1667 1668 if not kwargs: 1669 raise ValueError( 1670 'Must supply at least one keyword argument with patch.multiple' 1671 ) 1672 # need to wrap in a list for python 3, where items is a view 1673 items = list(kwargs.items()) 1674 attribute, new = items[0] 1675 patcher = _patch( 1676 getter, attribute, new, spec, create, spec_set, 1677 autospec, new_callable, {} 1678 ) 1679 patcher.attribute_name = attribute 1680 for attribute, new in items[1:]: 1681 this_patcher = _patch( 1682 getter, attribute, new, spec, create, spec_set, 1683 autospec, new_callable, {} 1684 ) 1685 this_patcher.attribute_name = attribute 1686 patcher.additional_patchers.append(this_patcher) 1687 return patcher 1688 1689 1690def patch( 1691 target, new=DEFAULT, spec=None, create=False, 1692 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs 1693 ): 1694 """ 1695 `patch` acts as a function decorator, class decorator or a context 1696 manager. Inside the body of the function or with statement, the `target` 1697 is patched with a `new` object. When the function/with statement exits 1698 the patch is undone. 1699 1700 If `new` is omitted, then the target is replaced with an 1701 `AsyncMock if the patched object is an async function or a 1702 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 1703 omitted, the created mock is passed in as an extra argument to the 1704 decorated function. If `patch` is used as a context manager the created 1705 mock is returned by the context manager. 1706 1707 `target` should be a string in the form `'package.module.ClassName'`. The 1708 `target` is imported and the specified object replaced with the `new` 1709 object, so the `target` must be importable from the environment you are 1710 calling `patch` from. The target is imported when the decorated function 1711 is executed, not at decoration time. 1712 1713 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 1714 if patch is creating one for you. 1715 1716 In addition you can pass `spec=True` or `spec_set=True`, which causes 1717 patch to pass in the object being mocked as the spec/spec_set object. 1718 1719 `new_callable` allows you to specify a different class, or callable object, 1720 that will be called to create the `new` object. By default `AsyncMock` is 1721 used for async functions and `MagicMock` for the rest. 1722 1723 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 1724 then the mock will be created with a spec from the object being replaced. 1725 All attributes of the mock will also have the spec of the corresponding 1726 attribute of the object being replaced. Methods and functions being 1727 mocked will have their arguments checked and will raise a `TypeError` if 1728 they are called with the wrong signature. For mocks replacing a class, 1729 their return value (the 'instance') will have the same spec as the class. 1730 1731 Instead of `autospec=True` you can pass `autospec=some_object` to use an 1732 arbitrary object as the spec instead of the one being replaced. 1733 1734 By default `patch` will fail to replace attributes that don't exist. If 1735 you pass in `create=True`, and the attribute doesn't exist, patch will 1736 create the attribute for you when the patched function is called, and 1737 delete it again afterwards. This is useful for writing tests against 1738 attributes that your production code creates at runtime. It is off by 1739 default because it can be dangerous. With it switched on you can write 1740 passing tests against APIs that don't actually exist! 1741 1742 Patch can be used as a `TestCase` class decorator. It works by 1743 decorating each test method in the class. This reduces the boilerplate 1744 code when your test methods share a common patchings set. `patch` finds 1745 tests by looking for method names that start with `patch.TEST_PREFIX`. 1746 By default this is `test`, which matches the way `unittest` finds tests. 1747 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 1748 1749 Patch can be used as a context manager, with the with statement. Here the 1750 patching applies to the indented block after the with statement. If you 1751 use "as" then the patched object will be bound to the name after the 1752 "as"; very useful if `patch` is creating a mock object for you. 1753 1754 Patch will raise a `RuntimeError` if passed some common misspellings of 1755 the arguments autospec and spec_set. Pass the argument `unsafe` with the 1756 value True to disable that check. 1757 1758 `patch` takes arbitrary keyword arguments. These will be passed to 1759 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 1760 otherwise or to `new_callable` if specified. 1761 1762 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 1763 available for alternate use-cases. 1764 """ 1765 getter, attribute = _get_target(target) 1766 return _patch( 1767 getter, attribute, new, spec, create, 1768 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 1769 ) 1770 1771 1772class _patch_dict(object): 1773 """ 1774 Patch a dictionary, or dictionary like object, and restore the dictionary 1775 to its original state after the test. 1776 1777 `in_dict` can be a dictionary or a mapping like container. If it is a 1778 mapping then it must at least support getting, setting and deleting items 1779 plus iterating over keys. 1780 1781 `in_dict` can also be a string specifying the name of the dictionary, which 1782 will then be fetched by importing it. 1783 1784 `values` can be a dictionary of values to set in the dictionary. `values` 1785 can also be an iterable of `(key, value)` pairs. 1786 1787 If `clear` is True then the dictionary will be cleared before the new 1788 values are set. 1789 1790 `patch.dict` can also be called with arbitrary keyword arguments to set 1791 values in the dictionary:: 1792 1793 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 1794 ... 1795 1796 `patch.dict` can be used as a context manager, decorator or class 1797 decorator. When used as a class decorator `patch.dict` honours 1798 `patch.TEST_PREFIX` for choosing which methods to wrap. 1799 """ 1800 1801 def __init__(self, in_dict, values=(), clear=False, **kwargs): 1802 self.in_dict = in_dict 1803 # support any argument supported by dict(...) constructor 1804 self.values = dict(values) 1805 self.values.update(kwargs) 1806 self.clear = clear 1807 self._original = None 1808 1809 1810 def __call__(self, f): 1811 if isinstance(f, type): 1812 return self.decorate_class(f) 1813 @wraps(f) 1814 def _inner(*args, **kw): 1815 self._patch_dict() 1816 try: 1817 return f(*args, **kw) 1818 finally: 1819 self._unpatch_dict() 1820 1821 return _inner 1822 1823 1824 def decorate_class(self, klass): 1825 for attr in dir(klass): 1826 attr_value = getattr(klass, attr) 1827 if (attr.startswith(patch.TEST_PREFIX) and 1828 hasattr(attr_value, "__call__")): 1829 decorator = _patch_dict(self.in_dict, self.values, self.clear) 1830 decorated = decorator(attr_value) 1831 setattr(klass, attr, decorated) 1832 return klass 1833 1834 1835 def __enter__(self): 1836 """Patch the dict.""" 1837 self._patch_dict() 1838 return self.in_dict 1839 1840 1841 def _patch_dict(self): 1842 values = self.values 1843 if isinstance(self.in_dict, str): 1844 self.in_dict = _importer(self.in_dict) 1845 in_dict = self.in_dict 1846 clear = self.clear 1847 1848 try: 1849 original = in_dict.copy() 1850 except AttributeError: 1851 # dict like object with no copy method 1852 # must support iteration over keys 1853 original = {} 1854 for key in in_dict: 1855 original[key] = in_dict[key] 1856 self._original = original 1857 1858 if clear: 1859 _clear_dict(in_dict) 1860 1861 try: 1862 in_dict.update(values) 1863 except AttributeError: 1864 # dict like object with no update method 1865 for key in values: 1866 in_dict[key] = values[key] 1867 1868 1869 def _unpatch_dict(self): 1870 in_dict = self.in_dict 1871 original = self._original 1872 1873 _clear_dict(in_dict) 1874 1875 try: 1876 in_dict.update(original) 1877 except AttributeError: 1878 for key in original: 1879 in_dict[key] = original[key] 1880 1881 1882 def __exit__(self, *args): 1883 """Unpatch the dict.""" 1884 if self._original is not None: 1885 self._unpatch_dict() 1886 return False 1887 1888 1889 def start(self): 1890 """Activate a patch, returning any created mock.""" 1891 result = self.__enter__() 1892 _patch._active_patches.append(self) 1893 return result 1894 1895 1896 def stop(self): 1897 """Stop an active patch.""" 1898 try: 1899 _patch._active_patches.remove(self) 1900 except ValueError: 1901 # If the patch hasn't been started this will fail 1902 return None 1903 1904 return self.__exit__(None, None, None) 1905 1906 1907def _clear_dict(in_dict): 1908 try: 1909 in_dict.clear() 1910 except AttributeError: 1911 keys = list(in_dict) 1912 for key in keys: 1913 del in_dict[key] 1914 1915 1916def _patch_stopall(): 1917 """Stop all active patches. LIFO to unroll nested patches.""" 1918 for patch in reversed(_patch._active_patches): 1919 patch.stop() 1920 1921 1922patch.object = _patch_object 1923patch.dict = _patch_dict 1924patch.multiple = _patch_multiple 1925patch.stopall = _patch_stopall 1926patch.TEST_PREFIX = 'test' 1927 1928magic_methods = ( 1929 "lt le gt ge eq ne " 1930 "getitem setitem delitem " 1931 "len contains iter " 1932 "hash str sizeof " 1933 "enter exit " 1934 # we added divmod and rdivmod here instead of numerics 1935 # because there is no idivmod 1936 "divmod rdivmod neg pos abs invert " 1937 "complex int float index " 1938 "round trunc floor ceil " 1939 "bool next " 1940 "fspath " 1941 "aiter " 1942) 1943 1944numerics = ( 1945 "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv" 1946) 1947inplace = ' '.join('i%s' % n for n in numerics.split()) 1948right = ' '.join('r%s' % n for n in numerics.split()) 1949 1950# not including __prepare__, __instancecheck__, __subclasscheck__ 1951# (as they are metaclass methods) 1952# __del__ is not supported at all as it causes problems if it exists 1953 1954_non_defaults = { 1955 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 1956 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 1957 '__getstate__', '__setstate__', '__getformat__', '__setformat__', 1958 '__repr__', '__dir__', '__subclasses__', '__format__', 1959 '__getnewargs_ex__', 1960} 1961 1962 1963def _get_method(name, func): 1964 "Turns a callable object (like a mock) into a real function" 1965 def method(self, /, *args, **kw): 1966 return func(self, *args, **kw) 1967 method.__name__ = name 1968 return method 1969 1970 1971_magics = { 1972 '__%s__' % method for method in 1973 ' '.join([magic_methods, numerics, inplace, right]).split() 1974} 1975 1976# Magic methods used for async `with` statements 1977_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 1978# Magic methods that are only used with async calls but are synchronous functions themselves 1979_sync_async_magics = {"__aiter__"} 1980_async_magics = _async_method_magics | _sync_async_magics 1981 1982_all_sync_magics = _magics | _non_defaults 1983_all_magics = _all_sync_magics | _async_magics 1984 1985_unsupported_magics = { 1986 '__getattr__', '__setattr__', 1987 '__init__', '__new__', '__prepare__', 1988 '__instancecheck__', '__subclasscheck__', 1989 '__del__' 1990} 1991 1992_calculate_return_value = { 1993 '__hash__': lambda self: object.__hash__(self), 1994 '__str__': lambda self: object.__str__(self), 1995 '__sizeof__': lambda self: object.__sizeof__(self), 1996 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 1997} 1998 1999_return_values = { 2000 '__lt__': NotImplemented, 2001 '__gt__': NotImplemented, 2002 '__le__': NotImplemented, 2003 '__ge__': NotImplemented, 2004 '__int__': 1, 2005 '__contains__': False, 2006 '__len__': 0, 2007 '__exit__': False, 2008 '__complex__': 1j, 2009 '__float__': 1.0, 2010 '__bool__': True, 2011 '__index__': 1, 2012 '__aexit__': False, 2013} 2014 2015 2016def _get_eq(self): 2017 def __eq__(other): 2018 ret_val = self.__eq__._mock_return_value 2019 if ret_val is not DEFAULT: 2020 return ret_val 2021 if self is other: 2022 return True 2023 return NotImplemented 2024 return __eq__ 2025 2026def _get_ne(self): 2027 def __ne__(other): 2028 if self.__ne__._mock_return_value is not DEFAULT: 2029 return DEFAULT 2030 if self is other: 2031 return False 2032 return NotImplemented 2033 return __ne__ 2034 2035def _get_iter(self): 2036 def __iter__(): 2037 ret_val = self.__iter__._mock_return_value 2038 if ret_val is DEFAULT: 2039 return iter([]) 2040 # if ret_val was already an iterator, then calling iter on it should 2041 # return the iterator unchanged 2042 return iter(ret_val) 2043 return __iter__ 2044 2045def _get_async_iter(self): 2046 def __aiter__(): 2047 ret_val = self.__aiter__._mock_return_value 2048 if ret_val is DEFAULT: 2049 return _AsyncIterator(iter([])) 2050 return _AsyncIterator(iter(ret_val)) 2051 return __aiter__ 2052 2053_side_effect_methods = { 2054 '__eq__': _get_eq, 2055 '__ne__': _get_ne, 2056 '__iter__': _get_iter, 2057 '__aiter__': _get_async_iter 2058} 2059 2060 2061 2062def _set_return_value(mock, method, name): 2063 fixed = _return_values.get(name, DEFAULT) 2064 if fixed is not DEFAULT: 2065 method.return_value = fixed 2066 return 2067 2068 return_calculator = _calculate_return_value.get(name) 2069 if return_calculator is not None: 2070 return_value = return_calculator(mock) 2071 method.return_value = return_value 2072 return 2073 2074 side_effector = _side_effect_methods.get(name) 2075 if side_effector is not None: 2076 method.side_effect = side_effector(mock) 2077 2078 2079 2080class MagicMixin(Base): 2081 def __init__(self, /, *args, **kw): 2082 self._mock_set_magics() # make magic work for kwargs in init 2083 _safe_super(MagicMixin, self).__init__(*args, **kw) 2084 self._mock_set_magics() # fix magic broken by upper level init 2085 2086 2087 def _mock_set_magics(self): 2088 orig_magics = _magics | _async_method_magics 2089 these_magics = orig_magics 2090 2091 if getattr(self, "_mock_methods", None) is not None: 2092 these_magics = orig_magics.intersection(self._mock_methods) 2093 2094 remove_magics = set() 2095 remove_magics = orig_magics - these_magics 2096 2097 for entry in remove_magics: 2098 if entry in type(self).__dict__: 2099 # remove unneeded magic methods 2100 delattr(self, entry) 2101 2102 # don't overwrite existing attributes if called a second time 2103 these_magics = these_magics - set(type(self).__dict__) 2104 2105 _type = type(self) 2106 for entry in these_magics: 2107 setattr(_type, entry, MagicProxy(entry, self)) 2108 2109 2110 2111class NonCallableMagicMock(MagicMixin, NonCallableMock): 2112 """A version of `MagicMock` that isn't callable.""" 2113 def mock_add_spec(self, spec, spec_set=False): 2114 """Add a spec to a mock. `spec` can either be an object or a 2115 list of strings. Only attributes on the `spec` can be fetched as 2116 attributes from the mock. 2117 2118 If `spec_set` is True then only attributes on the spec can be set.""" 2119 self._mock_add_spec(spec, spec_set) 2120 self._mock_set_magics() 2121 2122 2123class AsyncMagicMixin(MagicMixin): 2124 def __init__(self, /, *args, **kw): 2125 self._mock_set_magics() # make magic work for kwargs in init 2126 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw) 2127 self._mock_set_magics() # fix magic broken by upper level init 2128 2129class MagicMock(MagicMixin, Mock): 2130 """ 2131 MagicMock is a subclass of Mock with default implementations 2132 of most of the magic methods. You can use MagicMock without having to 2133 configure the magic methods yourself. 2134 2135 If you use the `spec` or `spec_set` arguments then *only* magic 2136 methods that exist in the spec will be created. 2137 2138 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 2139 """ 2140 def mock_add_spec(self, spec, spec_set=False): 2141 """Add a spec to a mock. `spec` can either be an object or a 2142 list of strings. Only attributes on the `spec` can be fetched as 2143 attributes from the mock. 2144 2145 If `spec_set` is True then only attributes on the spec can be set.""" 2146 self._mock_add_spec(spec, spec_set) 2147 self._mock_set_magics() 2148 2149 2150 2151class MagicProxy(Base): 2152 def __init__(self, name, parent): 2153 self.name = name 2154 self.parent = parent 2155 2156 def create_mock(self): 2157 entry = self.name 2158 parent = self.parent 2159 m = parent._get_child_mock(name=entry, _new_name=entry, 2160 _new_parent=parent) 2161 setattr(parent, entry, m) 2162 _set_return_value(parent, m, entry) 2163 return m 2164 2165 def __get__(self, obj, _type=None): 2166 return self.create_mock() 2167 2168 2169class AsyncMockMixin(Base): 2170 await_count = _delegating_property('await_count') 2171 await_args = _delegating_property('await_args') 2172 await_args_list = _delegating_property('await_args_list') 2173 2174 def __init__(self, /, *args, **kwargs): 2175 super().__init__(*args, **kwargs) 2176 # iscoroutinefunction() checks _is_coroutine property to say if an 2177 # object is a coroutine. Without this check it looks to see if it is a 2178 # function/method, which in this case it is not (since it is an 2179 # AsyncMock). 2180 # It is set through __dict__ because when spec_set is True, this 2181 # attribute is likely undefined. 2182 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 2183 self.__dict__['_mock_await_count'] = 0 2184 self.__dict__['_mock_await_args'] = None 2185 self.__dict__['_mock_await_args_list'] = _CallList() 2186 code_mock = NonCallableMock(spec_set=CodeType) 2187 code_mock.co_flags = inspect.CO_COROUTINE 2188 self.__dict__['__code__'] = code_mock 2189 2190 async def _execute_mock_call(self, /, *args, **kwargs): 2191 # This is nearly just like super(), except for special handling 2192 # of coroutines 2193 2194 _call = _Call((args, kwargs), two=True) 2195 self.await_count += 1 2196 self.await_args = _call 2197 self.await_args_list.append(_call) 2198 2199 effect = self.side_effect 2200 if effect is not None: 2201 if _is_exception(effect): 2202 raise effect 2203 elif not _callable(effect): 2204 try: 2205 result = next(effect) 2206 except StopIteration: 2207 # It is impossible to propogate a StopIteration 2208 # through coroutines because of PEP 479 2209 raise StopAsyncIteration 2210 if _is_exception(result): 2211 raise result 2212 elif iscoroutinefunction(effect): 2213 result = await effect(*args, **kwargs) 2214 else: 2215 result = effect(*args, **kwargs) 2216 2217 if result is not DEFAULT: 2218 return result 2219 2220 if self._mock_return_value is not DEFAULT: 2221 return self.return_value 2222 2223 if self._mock_wraps is not None: 2224 if iscoroutinefunction(self._mock_wraps): 2225 return await self._mock_wraps(*args, **kwargs) 2226 return self._mock_wraps(*args, **kwargs) 2227 2228 return self.return_value 2229 2230 def assert_awaited(self): 2231 """ 2232 Assert that the mock was awaited at least once. 2233 """ 2234 if self.await_count == 0: 2235 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 2236 raise AssertionError(msg) 2237 2238 def assert_awaited_once(self): 2239 """ 2240 Assert that the mock was awaited exactly once. 2241 """ 2242 if not self.await_count == 1: 2243 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2244 f" Awaited {self.await_count} times.") 2245 raise AssertionError(msg) 2246 2247 def assert_awaited_with(self, /, *args, **kwargs): 2248 """ 2249 Assert that the last await was with the specified arguments. 2250 """ 2251 if self.await_args is None: 2252 expected = self._format_mock_call_signature(args, kwargs) 2253 raise AssertionError(f'Expected await: {expected}\nNot awaited') 2254 2255 def _error_message(): 2256 msg = self._format_mock_failure_message(args, kwargs, action='await') 2257 return msg 2258 2259 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2260 actual = self._call_matcher(self.await_args) 2261 if actual != expected: 2262 cause = expected if isinstance(expected, Exception) else None 2263 raise AssertionError(_error_message()) from cause 2264 2265 def assert_awaited_once_with(self, /, *args, **kwargs): 2266 """ 2267 Assert that the mock was awaited exactly once and with the specified 2268 arguments. 2269 """ 2270 if not self.await_count == 1: 2271 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2272 f" Awaited {self.await_count} times.") 2273 raise AssertionError(msg) 2274 return self.assert_awaited_with(*args, **kwargs) 2275 2276 def assert_any_await(self, /, *args, **kwargs): 2277 """ 2278 Assert the mock has ever been awaited with the specified arguments. 2279 """ 2280 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2281 cause = expected if isinstance(expected, Exception) else None 2282 actual = [self._call_matcher(c) for c in self.await_args_list] 2283 if cause or expected not in _AnyComparer(actual): 2284 expected_string = self._format_mock_call_signature(args, kwargs) 2285 raise AssertionError( 2286 '%s await not found' % expected_string 2287 ) from cause 2288 2289 def assert_has_awaits(self, calls, any_order=False): 2290 """ 2291 Assert the mock has been awaited with the specified calls. 2292 The :attr:`await_args_list` list is checked for the awaits. 2293 2294 If `any_order` is False (the default) then the awaits must be 2295 sequential. There can be extra calls before or after the 2296 specified awaits. 2297 2298 If `any_order` is True then the awaits can be in any order, but 2299 they must all appear in :attr:`await_args_list`. 2300 """ 2301 expected = [self._call_matcher(c) for c in calls] 2302 cause = next((e for e in expected if isinstance(e, Exception)), None) 2303 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 2304 if not any_order: 2305 if expected not in all_awaits: 2306 if cause is None: 2307 problem = 'Awaits not found.' 2308 else: 2309 problem = ('Error processing expected awaits.\n' 2310 'Errors: {}').format( 2311 [e if isinstance(e, Exception) else None 2312 for e in expected]) 2313 raise AssertionError( 2314 f'{problem}\n' 2315 f'Expected: {_CallList(calls)}\n' 2316 f'Actual: {self.await_args_list}' 2317 ) from cause 2318 return 2319 2320 all_awaits = list(all_awaits) 2321 2322 not_found = [] 2323 for kall in expected: 2324 try: 2325 all_awaits.remove(kall) 2326 except ValueError: 2327 not_found.append(kall) 2328 if not_found: 2329 raise AssertionError( 2330 '%r not all found in await list' % (tuple(not_found),) 2331 ) from cause 2332 2333 def assert_not_awaited(self): 2334 """ 2335 Assert that the mock was never awaited. 2336 """ 2337 if self.await_count != 0: 2338 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 2339 f" Awaited {self.await_count} times.") 2340 raise AssertionError(msg) 2341 2342 def reset_mock(self, /, *args, **kwargs): 2343 """ 2344 See :func:`.Mock.reset_mock()` 2345 """ 2346 super().reset_mock(*args, **kwargs) 2347 self.await_count = 0 2348 self.await_args = None 2349 self.await_args_list = _CallList() 2350 2351 2352class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 2353 """ 2354 Enhance :class:`Mock` with features allowing to mock 2355 an async function. 2356 2357 The :class:`AsyncMock` object will behave so the object is 2358 recognized as an async function, and the result of a call is an awaitable: 2359 2360 >>> mock = AsyncMock() 2361 >>> iscoroutinefunction(mock) 2362 True 2363 >>> inspect.isawaitable(mock()) 2364 True 2365 2366 2367 The result of ``mock()`` is an async function which will have the outcome 2368 of ``side_effect`` or ``return_value``: 2369 2370 - if ``side_effect`` is a function, the async function will return the 2371 result of that function, 2372 - if ``side_effect`` is an exception, the async function will raise the 2373 exception, 2374 - if ``side_effect`` is an iterable, the async function will return the 2375 next value of the iterable, however, if the sequence of result is 2376 exhausted, ``StopIteration`` is raised immediately, 2377 - if ``side_effect`` is not defined, the async function will return the 2378 value defined by ``return_value``, hence, by default, the async function 2379 returns a new :class:`AsyncMock` object. 2380 2381 If the outcome of ``side_effect`` or ``return_value`` is an async function, 2382 the mock async function obtained when the mock object is called will be this 2383 async function itself (and not an async function returning an async 2384 function). 2385 2386 The test author can also specify a wrapped object with ``wraps``. In this 2387 case, the :class:`Mock` object behavior is the same as with an 2388 :class:`.Mock` object: the wrapped object may have methods 2389 defined as async function functions. 2390 2391 Based on Martin Richard's asynctest project. 2392 """ 2393 2394 2395class _ANY(object): 2396 "A helper object that compares equal to everything." 2397 2398 def __eq__(self, other): 2399 return True 2400 2401 def __ne__(self, other): 2402 return False 2403 2404 def __repr__(self): 2405 return '<ANY>' 2406 2407ANY = _ANY() 2408 2409 2410 2411def _format_call_signature(name, args, kwargs): 2412 message = '%s(%%s)' % name 2413 formatted_args = '' 2414 args_string = ', '.join([repr(arg) for arg in args]) 2415 kwargs_string = ', '.join([ 2416 '%s=%r' % (key, value) for key, value in kwargs.items() 2417 ]) 2418 if args_string: 2419 formatted_args = args_string 2420 if kwargs_string: 2421 if formatted_args: 2422 formatted_args += ', ' 2423 formatted_args += kwargs_string 2424 2425 return message % formatted_args 2426 2427 2428 2429class _Call(tuple): 2430 """ 2431 A tuple for holding the results of a call to a mock, either in the form 2432 `(args, kwargs)` or `(name, args, kwargs)`. 2433 2434 If args or kwargs are empty then a call tuple will compare equal to 2435 a tuple without those values. This makes comparisons less verbose:: 2436 2437 _Call(('name', (), {})) == ('name',) 2438 _Call(('name', (1,), {})) == ('name', (1,)) 2439 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 2440 2441 The `_Call` object provides a useful shortcut for comparing with call:: 2442 2443 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 2444 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 2445 2446 If the _Call has no name then it will match any name. 2447 """ 2448 def __new__(cls, value=(), name='', parent=None, two=False, 2449 from_kall=True): 2450 args = () 2451 kwargs = {} 2452 _len = len(value) 2453 if _len == 3: 2454 name, args, kwargs = value 2455 elif _len == 2: 2456 first, second = value 2457 if isinstance(first, str): 2458 name = first 2459 if isinstance(second, tuple): 2460 args = second 2461 else: 2462 kwargs = second 2463 else: 2464 args, kwargs = first, second 2465 elif _len == 1: 2466 value, = value 2467 if isinstance(value, str): 2468 name = value 2469 elif isinstance(value, tuple): 2470 args = value 2471 else: 2472 kwargs = value 2473 2474 if two: 2475 return tuple.__new__(cls, (args, kwargs)) 2476 2477 return tuple.__new__(cls, (name, args, kwargs)) 2478 2479 2480 def __init__(self, value=(), name=None, parent=None, two=False, 2481 from_kall=True): 2482 self._mock_name = name 2483 self._mock_parent = parent 2484 self._mock_from_kall = from_kall 2485 2486 2487 def __eq__(self, other): 2488 try: 2489 len_other = len(other) 2490 except TypeError: 2491 return NotImplemented 2492 2493 self_name = '' 2494 if len(self) == 2: 2495 self_args, self_kwargs = self 2496 else: 2497 self_name, self_args, self_kwargs = self 2498 2499 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 2500 and self._mock_parent != other._mock_parent): 2501 return False 2502 2503 other_name = '' 2504 if len_other == 0: 2505 other_args, other_kwargs = (), {} 2506 elif len_other == 3: 2507 other_name, other_args, other_kwargs = other 2508 elif len_other == 1: 2509 value, = other 2510 if isinstance(value, tuple): 2511 other_args = value 2512 other_kwargs = {} 2513 elif isinstance(value, str): 2514 other_name = value 2515 other_args, other_kwargs = (), {} 2516 else: 2517 other_args = () 2518 other_kwargs = value 2519 elif len_other == 2: 2520 # could be (name, args) or (name, kwargs) or (args, kwargs) 2521 first, second = other 2522 if isinstance(first, str): 2523 other_name = first 2524 if isinstance(second, tuple): 2525 other_args, other_kwargs = second, {} 2526 else: 2527 other_args, other_kwargs = (), second 2528 else: 2529 other_args, other_kwargs = first, second 2530 else: 2531 return False 2532 2533 if self_name and other_name != self_name: 2534 return False 2535 2536 # this order is important for ANY to work! 2537 return (other_args, other_kwargs) == (self_args, self_kwargs) 2538 2539 2540 __ne__ = object.__ne__ 2541 2542 2543 def __call__(self, /, *args, **kwargs): 2544 if self._mock_name is None: 2545 return _Call(('', args, kwargs), name='()') 2546 2547 name = self._mock_name + '()' 2548 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 2549 2550 2551 def __getattr__(self, attr): 2552 if self._mock_name is None: 2553 return _Call(name=attr, from_kall=False) 2554 name = '%s.%s' % (self._mock_name, attr) 2555 return _Call(name=name, parent=self, from_kall=False) 2556 2557 2558 def __getattribute__(self, attr): 2559 if attr in tuple.__dict__: 2560 raise AttributeError 2561 return tuple.__getattribute__(self, attr) 2562 2563 2564 def _get_call_arguments(self): 2565 if len(self) == 2: 2566 args, kwargs = self 2567 else: 2568 name, args, kwargs = self 2569 2570 return args, kwargs 2571 2572 @property 2573 def args(self): 2574 return self._get_call_arguments()[0] 2575 2576 @property 2577 def kwargs(self): 2578 return self._get_call_arguments()[1] 2579 2580 def __repr__(self): 2581 if not self._mock_from_kall: 2582 name = self._mock_name or 'call' 2583 if name.startswith('()'): 2584 name = 'call%s' % name 2585 return name 2586 2587 if len(self) == 2: 2588 name = 'call' 2589 args, kwargs = self 2590 else: 2591 name, args, kwargs = self 2592 if not name: 2593 name = 'call' 2594 elif not name.startswith('()'): 2595 name = 'call.%s' % name 2596 else: 2597 name = 'call%s' % name 2598 return _format_call_signature(name, args, kwargs) 2599 2600 2601 def call_list(self): 2602 """For a call object that represents multiple calls, `call_list` 2603 returns a list of all the intermediate calls as well as the 2604 final call.""" 2605 vals = [] 2606 thing = self 2607 while thing is not None: 2608 if thing._mock_from_kall: 2609 vals.append(thing) 2610 thing = thing._mock_parent 2611 return _CallList(reversed(vals)) 2612 2613 2614call = _Call(from_kall=False) 2615 2616 2617def create_autospec(spec, spec_set=False, instance=False, _parent=None, 2618 _name=None, *, unsafe=False, **kwargs): 2619 """Create a mock object using another object as a spec. Attributes on the 2620 mock will use the corresponding attribute on the `spec` object as their 2621 spec. 2622 2623 Functions or methods being mocked will have their arguments checked 2624 to check that they are called with the correct signature. 2625 2626 If `spec_set` is True then attempting to set attributes that don't exist 2627 on the spec object will raise an `AttributeError`. 2628 2629 If a class is used as a spec then the return value of the mock (the 2630 instance of the class) will have the same spec. You can use a class as the 2631 spec for an instance object by passing `instance=True`. The returned mock 2632 will only be callable if instances of the mock are callable. 2633 2634 `create_autospec` will raise a `RuntimeError` if passed some common 2635 misspellings of the arguments autospec and spec_set. Pass the argument 2636 `unsafe` with the value True to disable that check. 2637 2638 `create_autospec` also takes arbitrary keyword arguments that are passed to 2639 the constructor of the created mock.""" 2640 if _is_list(spec): 2641 # can't pass a list instance to the mock constructor as it will be 2642 # interpreted as a list of strings 2643 spec = type(spec) 2644 2645 is_type = isinstance(spec, type) 2646 if _is_instance_mock(spec): 2647 raise InvalidSpecError(f'Cannot autospec a Mock object. ' 2648 f'[object={spec!r}]') 2649 is_async_func = _is_async_func(spec) 2650 _kwargs = {'spec': spec} 2651 if spec_set: 2652 _kwargs = {'spec_set': spec} 2653 elif spec is None: 2654 # None we mock with a normal mock without a spec 2655 _kwargs = {} 2656 if _kwargs and instance: 2657 _kwargs['_spec_as_instance'] = True 2658 if not unsafe: 2659 _check_spec_arg_typos(kwargs) 2660 2661 _kwargs.update(kwargs) 2662 2663 Klass = MagicMock 2664 if inspect.isdatadescriptor(spec): 2665 # descriptors don't have a spec 2666 # because we don't know what type they return 2667 _kwargs = {} 2668 elif is_async_func: 2669 if instance: 2670 raise RuntimeError("Instance can not be True when create_autospec " 2671 "is mocking an async function") 2672 Klass = AsyncMock 2673 elif not _callable(spec): 2674 Klass = NonCallableMagicMock 2675 elif is_type and instance and not _instance_callable(spec): 2676 Klass = NonCallableMagicMock 2677 2678 _name = _kwargs.pop('name', _name) 2679 2680 _new_name = _name 2681 if _parent is None: 2682 # for a top level object no _new_name should be set 2683 _new_name = '' 2684 2685 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 2686 name=_name, **_kwargs) 2687 2688 if isinstance(spec, FunctionTypes): 2689 # should only happen at the top level because we don't 2690 # recurse for functions 2691 mock = _set_signature(mock, spec) 2692 if is_async_func: 2693 _setup_async_mock(mock) 2694 else: 2695 _check_signature(spec, mock, is_type, instance) 2696 2697 if _parent is not None and not instance: 2698 _parent._mock_children[_name] = mock 2699 2700 if is_type and not instance and 'return_value' not in kwargs: 2701 mock.return_value = create_autospec(spec, spec_set, instance=True, 2702 _name='()', _parent=mock) 2703 2704 for entry in dir(spec): 2705 if _is_magic(entry): 2706 # MagicMock already does the useful magic methods for us 2707 continue 2708 2709 # XXXX do we need a better way of getting attributes without 2710 # triggering code execution (?) Probably not - we need the actual 2711 # object to mock it so we would rather trigger a property than mock 2712 # the property descriptor. Likewise we want to mock out dynamically 2713 # provided attributes. 2714 # XXXX what about attributes that raise exceptions other than 2715 # AttributeError on being fetched? 2716 # we could be resilient against it, or catch and propagate the 2717 # exception when the attribute is fetched from the mock 2718 try: 2719 original = getattr(spec, entry) 2720 except AttributeError: 2721 continue 2722 2723 kwargs = {'spec': original} 2724 if spec_set: 2725 kwargs = {'spec_set': original} 2726 2727 if not isinstance(original, FunctionTypes): 2728 new = _SpecState(original, spec_set, mock, entry, instance) 2729 mock._mock_children[entry] = new 2730 else: 2731 parent = mock 2732 if isinstance(spec, FunctionTypes): 2733 parent = mock.mock 2734 2735 skipfirst = _must_skip(spec, entry, is_type) 2736 kwargs['_eat_self'] = skipfirst 2737 if iscoroutinefunction(original): 2738 child_klass = AsyncMock 2739 else: 2740 child_klass = MagicMock 2741 new = child_klass(parent=parent, name=entry, _new_name=entry, 2742 _new_parent=parent, 2743 **kwargs) 2744 mock._mock_children[entry] = new 2745 _check_signature(original, new, skipfirst=skipfirst) 2746 2747 # so functions created with _set_signature become instance attributes, 2748 # *plus* their underlying mock exists in _mock_children of the parent 2749 # mock. Adding to _mock_children may be unnecessary where we are also 2750 # setting as an instance attribute? 2751 if isinstance(new, FunctionTypes): 2752 setattr(mock, entry, new) 2753 2754 return mock 2755 2756 2757def _must_skip(spec, entry, is_type): 2758 """ 2759 Return whether we should skip the first argument on spec's `entry` 2760 attribute. 2761 """ 2762 if not isinstance(spec, type): 2763 if entry in getattr(spec, '__dict__', {}): 2764 # instance attribute - shouldn't skip 2765 return False 2766 spec = spec.__class__ 2767 2768 for klass in spec.__mro__: 2769 result = klass.__dict__.get(entry, DEFAULT) 2770 if result is DEFAULT: 2771 continue 2772 if isinstance(result, (staticmethod, classmethod)): 2773 return False 2774 elif isinstance(result, FunctionTypes): 2775 # Normal method => skip if looked up on type 2776 # (if looked up on instance, self is already skipped) 2777 return is_type 2778 else: 2779 return False 2780 2781 # function is a dynamically provided attribute 2782 return is_type 2783 2784 2785class _SpecState(object): 2786 2787 def __init__(self, spec, spec_set=False, parent=None, 2788 name=None, ids=None, instance=False): 2789 self.spec = spec 2790 self.ids = ids 2791 self.spec_set = spec_set 2792 self.parent = parent 2793 self.instance = instance 2794 self.name = name 2795 2796 2797FunctionTypes = ( 2798 # python function 2799 type(create_autospec), 2800 # instance method 2801 type(ANY.__eq__), 2802) 2803 2804 2805file_spec = None 2806 2807 2808def _to_stream(read_data): 2809 if isinstance(read_data, bytes): 2810 return io.BytesIO(read_data) 2811 else: 2812 return io.StringIO(read_data) 2813 2814 2815def mock_open(mock=None, read_data=''): 2816 """ 2817 A helper function to create a mock to replace the use of `open`. It works 2818 for `open` called directly or used as a context manager. 2819 2820 The `mock` argument is the mock object to configure. If `None` (the 2821 default) then a `MagicMock` will be created for you, with the API limited 2822 to methods or attributes available on standard file handles. 2823 2824 `read_data` is a string for the `read`, `readline` and `readlines` of the 2825 file handle to return. This is an empty string by default. 2826 """ 2827 _read_data = _to_stream(read_data) 2828 _state = [_read_data, None] 2829 2830 def _readlines_side_effect(*args, **kwargs): 2831 if handle.readlines.return_value is not None: 2832 return handle.readlines.return_value 2833 return _state[0].readlines(*args, **kwargs) 2834 2835 def _read_side_effect(*args, **kwargs): 2836 if handle.read.return_value is not None: 2837 return handle.read.return_value 2838 return _state[0].read(*args, **kwargs) 2839 2840 def _readline_side_effect(*args, **kwargs): 2841 yield from _iter_side_effect() 2842 while True: 2843 yield _state[0].readline(*args, **kwargs) 2844 2845 def _iter_side_effect(): 2846 if handle.readline.return_value is not None: 2847 while True: 2848 yield handle.readline.return_value 2849 for line in _state[0]: 2850 yield line 2851 2852 def _next_side_effect(): 2853 if handle.readline.return_value is not None: 2854 return handle.readline.return_value 2855 return next(_state[0]) 2856 2857 global file_spec 2858 if file_spec is None: 2859 import _io 2860 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 2861 2862 if mock is None: 2863 mock = MagicMock(name='open', spec=open) 2864 2865 handle = MagicMock(spec=file_spec) 2866 handle.__enter__.return_value = handle 2867 2868 handle.write.return_value = None 2869 handle.read.return_value = None 2870 handle.readline.return_value = None 2871 handle.readlines.return_value = None 2872 2873 handle.read.side_effect = _read_side_effect 2874 _state[1] = _readline_side_effect() 2875 handle.readline.side_effect = _state[1] 2876 handle.readlines.side_effect = _readlines_side_effect 2877 handle.__iter__.side_effect = _iter_side_effect 2878 handle.__next__.side_effect = _next_side_effect 2879 2880 def reset_data(*args, **kwargs): 2881 _state[0] = _to_stream(read_data) 2882 if handle.readline.side_effect == _state[1]: 2883 # Only reset the side effect if the user hasn't overridden it. 2884 _state[1] = _readline_side_effect() 2885 handle.readline.side_effect = _state[1] 2886 return DEFAULT 2887 2888 mock.side_effect = reset_data 2889 mock.return_value = handle 2890 return mock 2891 2892 2893class PropertyMock(Mock): 2894 """ 2895 A mock intended to be used as a property, or other descriptor, on a class. 2896 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 2897 a return value when it is fetched. 2898 2899 Fetching a `PropertyMock` instance from an object calls the mock, with 2900 no args. Setting it calls the mock with the value being set. 2901 """ 2902 def _get_child_mock(self, /, **kwargs): 2903 return MagicMock(**kwargs) 2904 2905 def __get__(self, obj, obj_type=None): 2906 return self() 2907 def __set__(self, obj, val): 2908 self(val) 2909 2910 2911def seal(mock): 2912 """Disable the automatic generation of child mocks. 2913 2914 Given an input Mock, seals it to ensure no further mocks will be generated 2915 when accessing an attribute that was not already defined. 2916 2917 The operation recursively seals the mock passed in, meaning that 2918 the mock itself, any mocks generated by accessing one of its attributes, 2919 and all assigned mocks without a name or spec will be sealed. 2920 """ 2921 mock._mock_sealed = True 2922 for attr in dir(mock): 2923 try: 2924 m = getattr(mock, attr) 2925 except AttributeError: 2926 continue 2927 if not isinstance(m, NonCallableMock): 2928 continue 2929 if isinstance(m._mock_children.get(attr), _SpecState): 2930 continue 2931 if m._mock_new_parent is mock: 2932 seal(m) 2933 2934 2935class _AsyncIterator: 2936 """ 2937 Wraps an iterator in an asynchronous iterator. 2938 """ 2939 def __init__(self, iterator): 2940 self.iterator = iterator 2941 code_mock = NonCallableMock(spec_set=CodeType) 2942 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 2943 self.__dict__['__code__'] = code_mock 2944 2945 async def __anext__(self): 2946 try: 2947 return next(self.iterator) 2948 except StopIteration: 2949 pass 2950 raise StopAsyncIteration 2951