1# mock.py 2# Test tools for mocking and patching. 3# Maintained by Michael Foord 4# Backport for other versions of Python available from 5# https://pypi.org/project/mock 6 7__all__ = ( 8 'Mock', 9 'MagicMock', 10 'patch', 11 'sentinel', 12 'DEFAULT', 13 'ANY', 14 'call', 15 'create_autospec', 16 'AsyncMock', 17 'FILTER_DIR', 18 'NonCallableMock', 19 'NonCallableMagicMock', 20 'mock_open', 21 'PropertyMock', 22 'seal', 23) 24 25 26import asyncio 27import contextlib 28import io 29import inspect 30import pprint 31import sys 32import builtins 33from asyncio import iscoroutinefunction 34from types import CodeType, ModuleType, MethodType 35from unittest.util import safe_repr 36from functools import wraps, partial 37 38 39_builtins = {name for name in dir(builtins) if not name.startswith('_')} 40 41FILTER_DIR = True 42 43# Workaround for issue #12370 44# Without this, the __class__ properties wouldn't be set correctly 45_safe_super = super 46 47def _is_async_obj(obj): 48 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 49 return False 50 if hasattr(obj, '__func__'): 51 obj = getattr(obj, '__func__') 52 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 53 54 55def _is_async_func(func): 56 if getattr(func, '__code__', None): 57 return iscoroutinefunction(func) 58 else: 59 return False 60 61 62def _is_instance_mock(obj): 63 # can't use isinstance on Mock objects because they override __class__ 64 # The base class for all mocks is NonCallableMock 65 return issubclass(type(obj), NonCallableMock) 66 67 68def _is_exception(obj): 69 return ( 70 isinstance(obj, BaseException) or 71 isinstance(obj, type) and issubclass(obj, BaseException) 72 ) 73 74 75def _extract_mock(obj): 76 # Autospecced functions will return a FunctionType with "mock" attribute 77 # which is the actual mock object that needs to be used. 78 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 79 return obj.mock 80 else: 81 return obj 82 83 84def _get_signature_object(func, as_instance, eat_self): 85 """ 86 Given an arbitrary, possibly callable object, try to create a suitable 87 signature object. 88 Return a (reduced func, signature) tuple, or None. 89 """ 90 if isinstance(func, type) and not as_instance: 91 # If it's a type and should be modelled as a type, use __init__. 92 func = func.__init__ 93 # Skip the `self` argument in __init__ 94 eat_self = True 95 elif not isinstance(func, FunctionTypes): 96 # If we really want to model an instance of the passed type, 97 # __call__ should be looked up, not __init__. 98 try: 99 func = func.__call__ 100 except AttributeError: 101 return None 102 if eat_self: 103 sig_func = partial(func, None) 104 else: 105 sig_func = func 106 try: 107 return func, inspect.signature(sig_func) 108 except ValueError: 109 # Certain callable types are not supported by inspect.signature() 110 return None 111 112 113def _check_signature(func, mock, skipfirst, instance=False): 114 sig = _get_signature_object(func, instance, skipfirst) 115 if sig is None: 116 return 117 func, sig = sig 118 def checksig(self, /, *args, **kwargs): 119 sig.bind(*args, **kwargs) 120 _copy_func_details(func, checksig) 121 type(mock)._mock_check_sig = checksig 122 type(mock).__signature__ = sig 123 124 125def _copy_func_details(func, funcopy): 126 # we explicitly don't copy func.__dict__ into this copy as it would 127 # expose original attributes that should be mocked 128 for attribute in ( 129 '__name__', '__doc__', '__text_signature__', 130 '__module__', '__defaults__', '__kwdefaults__', 131 ): 132 try: 133 setattr(funcopy, attribute, getattr(func, attribute)) 134 except AttributeError: 135 pass 136 137 138def _callable(obj): 139 if isinstance(obj, type): 140 return True 141 if isinstance(obj, (staticmethod, classmethod, MethodType)): 142 return _callable(obj.__func__) 143 if getattr(obj, '__call__', None) is not None: 144 return True 145 return False 146 147 148def _is_list(obj): 149 # checks for list or tuples 150 # XXXX badly named! 151 return type(obj) in (list, tuple) 152 153 154def _instance_callable(obj): 155 """Given an object, return True if the object is callable. 156 For classes, return True if instances would be callable.""" 157 if not isinstance(obj, type): 158 # already an instance 159 return getattr(obj, '__call__', None) is not None 160 161 # *could* be broken by a class overriding __mro__ or __dict__ via 162 # a metaclass 163 for base in (obj,) + obj.__mro__: 164 if base.__dict__.get('__call__') is not None: 165 return True 166 return False 167 168 169def _set_signature(mock, original, instance=False): 170 # creates a function with signature (*args, **kwargs) that delegates to a 171 # mock. It still does signature checking by calling a lambda with the same 172 # signature as the original. 173 174 skipfirst = isinstance(original, type) 175 result = _get_signature_object(original, instance, skipfirst) 176 if result is None: 177 return mock 178 func, sig = result 179 def checksig(*args, **kwargs): 180 sig.bind(*args, **kwargs) 181 _copy_func_details(func, checksig) 182 183 name = original.__name__ 184 if not name.isidentifier(): 185 name = 'funcopy' 186 context = {'_checksig_': checksig, 'mock': mock} 187 src = """def %s(*args, **kwargs): 188 _checksig_(*args, **kwargs) 189 return mock(*args, **kwargs)""" % name 190 exec (src, context) 191 funcopy = context[name] 192 _setup_func(funcopy, mock, sig) 193 return funcopy 194 195 196def _setup_func(funcopy, mock, sig): 197 funcopy.mock = mock 198 199 def assert_called_with(*args, **kwargs): 200 return mock.assert_called_with(*args, **kwargs) 201 def assert_called(*args, **kwargs): 202 return mock.assert_called(*args, **kwargs) 203 def assert_not_called(*args, **kwargs): 204 return mock.assert_not_called(*args, **kwargs) 205 def assert_called_once(*args, **kwargs): 206 return mock.assert_called_once(*args, **kwargs) 207 def assert_called_once_with(*args, **kwargs): 208 return mock.assert_called_once_with(*args, **kwargs) 209 def assert_has_calls(*args, **kwargs): 210 return mock.assert_has_calls(*args, **kwargs) 211 def assert_any_call(*args, **kwargs): 212 return mock.assert_any_call(*args, **kwargs) 213 def reset_mock(): 214 funcopy.method_calls = _CallList() 215 funcopy.mock_calls = _CallList() 216 mock.reset_mock() 217 ret = funcopy.return_value 218 if _is_instance_mock(ret) and not ret is mock: 219 ret.reset_mock() 220 221 funcopy.called = False 222 funcopy.call_count = 0 223 funcopy.call_args = None 224 funcopy.call_args_list = _CallList() 225 funcopy.method_calls = _CallList() 226 funcopy.mock_calls = _CallList() 227 228 funcopy.return_value = mock.return_value 229 funcopy.side_effect = mock.side_effect 230 funcopy._mock_children = mock._mock_children 231 232 funcopy.assert_called_with = assert_called_with 233 funcopy.assert_called_once_with = assert_called_once_with 234 funcopy.assert_has_calls = assert_has_calls 235 funcopy.assert_any_call = assert_any_call 236 funcopy.reset_mock = reset_mock 237 funcopy.assert_called = assert_called 238 funcopy.assert_not_called = assert_not_called 239 funcopy.assert_called_once = assert_called_once 240 funcopy.__signature__ = sig 241 242 mock._mock_delegate = funcopy 243 244 245def _setup_async_mock(mock): 246 mock._is_coroutine = asyncio.coroutines._is_coroutine 247 mock.await_count = 0 248 mock.await_args = None 249 mock.await_args_list = _CallList() 250 251 # Mock is not configured yet so the attributes are set 252 # to a function and then the corresponding mock helper function 253 # is called when the helper is accessed similar to _setup_func. 254 def wrapper(attr, /, *args, **kwargs): 255 return getattr(mock.mock, attr)(*args, **kwargs) 256 257 for attribute in ('assert_awaited', 258 'assert_awaited_once', 259 'assert_awaited_with', 260 'assert_awaited_once_with', 261 'assert_any_await', 262 'assert_has_awaits', 263 'assert_not_awaited'): 264 265 # setattr(mock, attribute, wrapper) causes late binding 266 # hence attribute will always be the last value in the loop 267 # Use partial(wrapper, attribute) to ensure the attribute is bound 268 # correctly. 269 setattr(mock, attribute, partial(wrapper, attribute)) 270 271 272def _is_magic(name): 273 return '__%s__' % name[2:-2] == name 274 275 276class _SentinelObject(object): 277 "A unique, named, sentinel object." 278 def __init__(self, name): 279 self.name = name 280 281 def __repr__(self): 282 return 'sentinel.%s' % self.name 283 284 def __reduce__(self): 285 return 'sentinel.%s' % self.name 286 287 288class _Sentinel(object): 289 """Access attributes to return a named object, usable as a sentinel.""" 290 def __init__(self): 291 self._sentinels = {} 292 293 def __getattr__(self, name): 294 if name == '__bases__': 295 # Without this help(unittest.mock) raises an exception 296 raise AttributeError 297 return self._sentinels.setdefault(name, _SentinelObject(name)) 298 299 def __reduce__(self): 300 return 'sentinel' 301 302 303sentinel = _Sentinel() 304 305DEFAULT = sentinel.DEFAULT 306_missing = sentinel.MISSING 307_deleted = sentinel.DELETED 308 309 310_allowed_names = { 311 'return_value', '_mock_return_value', 'side_effect', 312 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 313 '_mock_name', '_mock_new_name' 314} 315 316 317def _delegating_property(name): 318 _allowed_names.add(name) 319 _the_name = '_mock_' + name 320 def _get(self, name=name, _the_name=_the_name): 321 sig = self._mock_delegate 322 if sig is None: 323 return getattr(self, _the_name) 324 return getattr(sig, name) 325 def _set(self, value, name=name, _the_name=_the_name): 326 sig = self._mock_delegate 327 if sig is None: 328 self.__dict__[_the_name] = value 329 else: 330 setattr(sig, name, value) 331 332 return property(_get, _set) 333 334 335 336class _CallList(list): 337 338 def __contains__(self, value): 339 if not isinstance(value, list): 340 return list.__contains__(self, value) 341 len_value = len(value) 342 len_self = len(self) 343 if len_value > len_self: 344 return False 345 346 for i in range(0, len_self - len_value + 1): 347 sub_list = self[i:i+len_value] 348 if sub_list == value: 349 return True 350 return False 351 352 def __repr__(self): 353 return pprint.pformat(list(self)) 354 355 356def _check_and_set_parent(parent, value, name, new_name): 357 value = _extract_mock(value) 358 359 if not _is_instance_mock(value): 360 return False 361 if ((value._mock_name or value._mock_new_name) or 362 (value._mock_parent is not None) or 363 (value._mock_new_parent is not None)): 364 return False 365 366 _parent = parent 367 while _parent is not None: 368 # setting a mock (value) as a child or return value of itself 369 # should not modify the mock 370 if _parent is value: 371 return False 372 _parent = _parent._mock_new_parent 373 374 if new_name: 375 value._mock_new_parent = parent 376 value._mock_new_name = new_name 377 if name: 378 value._mock_parent = parent 379 value._mock_name = name 380 return True 381 382# Internal class to identify if we wrapped an iterator object or not. 383class _MockIter(object): 384 def __init__(self, obj): 385 self.obj = iter(obj) 386 def __next__(self): 387 return next(self.obj) 388 389class Base(object): 390 _mock_return_value = DEFAULT 391 _mock_side_effect = None 392 def __init__(self, /, *args, **kwargs): 393 pass 394 395 396 397class NonCallableMock(Base): 398 """A non-callable version of `Mock`""" 399 400 def __new__(cls, /, *args, **kw): 401 # every instance has its own class 402 # so we can create magic methods on the 403 # class without stomping on other mocks 404 bases = (cls,) 405 if not issubclass(cls, AsyncMockMixin): 406 # Check if spec is an async object or function 407 bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments 408 spec_arg = bound_args.get('spec_set', bound_args.get('spec')) 409 if spec_arg and _is_async_obj(spec_arg): 410 bases = (AsyncMockMixin, cls) 411 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 412 instance = _safe_super(NonCallableMock, cls).__new__(new) 413 return instance 414 415 416 def __init__( 417 self, spec=None, wraps=None, name=None, spec_set=None, 418 parent=None, _spec_state=None, _new_name='', _new_parent=None, 419 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 420 ): 421 if _new_parent is None: 422 _new_parent = parent 423 424 __dict__ = self.__dict__ 425 __dict__['_mock_parent'] = parent 426 __dict__['_mock_name'] = name 427 __dict__['_mock_new_name'] = _new_name 428 __dict__['_mock_new_parent'] = _new_parent 429 __dict__['_mock_sealed'] = False 430 431 if spec_set is not None: 432 spec = spec_set 433 spec_set = True 434 if _eat_self is None: 435 _eat_self = parent is not None 436 437 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 438 439 __dict__['_mock_children'] = {} 440 __dict__['_mock_wraps'] = wraps 441 __dict__['_mock_delegate'] = None 442 443 __dict__['_mock_called'] = False 444 __dict__['_mock_call_args'] = None 445 __dict__['_mock_call_count'] = 0 446 __dict__['_mock_call_args_list'] = _CallList() 447 __dict__['_mock_mock_calls'] = _CallList() 448 449 __dict__['method_calls'] = _CallList() 450 __dict__['_mock_unsafe'] = unsafe 451 452 if kwargs: 453 self.configure_mock(**kwargs) 454 455 _safe_super(NonCallableMock, self).__init__( 456 spec, wraps, name, spec_set, parent, 457 _spec_state 458 ) 459 460 461 def attach_mock(self, mock, attribute): 462 """ 463 Attach a mock as an attribute of this one, replacing its name and 464 parent. Calls to the attached mock will be recorded in the 465 `method_calls` and `mock_calls` attributes of this one.""" 466 inner_mock = _extract_mock(mock) 467 468 inner_mock._mock_parent = None 469 inner_mock._mock_new_parent = None 470 inner_mock._mock_name = '' 471 inner_mock._mock_new_name = None 472 473 setattr(self, attribute, mock) 474 475 476 def mock_add_spec(self, spec, spec_set=False): 477 """Add a spec to a mock. `spec` can either be an object or a 478 list of strings. Only attributes on the `spec` can be fetched as 479 attributes from the mock. 480 481 If `spec_set` is True then only attributes on the spec can be set.""" 482 self._mock_add_spec(spec, spec_set) 483 484 485 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 486 _eat_self=False): 487 _spec_class = None 488 _spec_signature = None 489 _spec_asyncs = [] 490 491 for attr in dir(spec): 492 if iscoroutinefunction(getattr(spec, attr, None)): 493 _spec_asyncs.append(attr) 494 495 if spec is not None and not _is_list(spec): 496 if isinstance(spec, type): 497 _spec_class = spec 498 else: 499 _spec_class = type(spec) 500 res = _get_signature_object(spec, 501 _spec_as_instance, _eat_self) 502 _spec_signature = res and res[1] 503 504 spec = dir(spec) 505 506 __dict__ = self.__dict__ 507 __dict__['_spec_class'] = _spec_class 508 __dict__['_spec_set'] = spec_set 509 __dict__['_spec_signature'] = _spec_signature 510 __dict__['_mock_methods'] = spec 511 __dict__['_spec_asyncs'] = _spec_asyncs 512 513 def __get_return_value(self): 514 ret = self._mock_return_value 515 if self._mock_delegate is not None: 516 ret = self._mock_delegate.return_value 517 518 if ret is DEFAULT: 519 ret = self._get_child_mock( 520 _new_parent=self, _new_name='()' 521 ) 522 self.return_value = ret 523 return ret 524 525 526 def __set_return_value(self, value): 527 if self._mock_delegate is not None: 528 self._mock_delegate.return_value = value 529 else: 530 self._mock_return_value = value 531 _check_and_set_parent(self, value, None, '()') 532 533 __return_value_doc = "The value to be returned when the mock is called." 534 return_value = property(__get_return_value, __set_return_value, 535 __return_value_doc) 536 537 538 @property 539 def __class__(self): 540 if self._spec_class is None: 541 return type(self) 542 return self._spec_class 543 544 called = _delegating_property('called') 545 call_count = _delegating_property('call_count') 546 call_args = _delegating_property('call_args') 547 call_args_list = _delegating_property('call_args_list') 548 mock_calls = _delegating_property('mock_calls') 549 550 551 def __get_side_effect(self): 552 delegated = self._mock_delegate 553 if delegated is None: 554 return self._mock_side_effect 555 sf = delegated.side_effect 556 if (sf is not None and not callable(sf) 557 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 558 sf = _MockIter(sf) 559 delegated.side_effect = sf 560 return sf 561 562 def __set_side_effect(self, value): 563 value = _try_iter(value) 564 delegated = self._mock_delegate 565 if delegated is None: 566 self._mock_side_effect = value 567 else: 568 delegated.side_effect = value 569 570 side_effect = property(__get_side_effect, __set_side_effect) 571 572 573 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 574 "Restore the mock object to its initial state." 575 if visited is None: 576 visited = [] 577 if id(self) in visited: 578 return 579 visited.append(id(self)) 580 581 self.called = False 582 self.call_args = None 583 self.call_count = 0 584 self.mock_calls = _CallList() 585 self.call_args_list = _CallList() 586 self.method_calls = _CallList() 587 588 if return_value: 589 self._mock_return_value = DEFAULT 590 if side_effect: 591 self._mock_side_effect = None 592 593 for child in self._mock_children.values(): 594 if isinstance(child, _SpecState) or child is _deleted: 595 continue 596 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 597 598 ret = self._mock_return_value 599 if _is_instance_mock(ret) and ret is not self: 600 ret.reset_mock(visited) 601 602 603 def configure_mock(self, /, **kwargs): 604 """Set attributes on the mock through keyword arguments. 605 606 Attributes plus return values and side effects can be set on child 607 mocks using standard dot notation and unpacking a dictionary in the 608 method call: 609 610 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 611 >>> mock.configure_mock(**attrs)""" 612 for arg, val in sorted(kwargs.items(), 613 # we sort on the number of dots so that 614 # attributes are set before we set attributes on 615 # attributes 616 key=lambda entry: entry[0].count('.')): 617 args = arg.split('.') 618 final = args.pop() 619 obj = self 620 for entry in args: 621 obj = getattr(obj, entry) 622 setattr(obj, final, val) 623 624 625 def __getattr__(self, name): 626 if name in {'_mock_methods', '_mock_unsafe'}: 627 raise AttributeError(name) 628 elif self._mock_methods is not None: 629 if name not in self._mock_methods or name in _all_magics: 630 raise AttributeError("Mock object has no attribute %r" % name) 631 elif _is_magic(name): 632 raise AttributeError(name) 633 if not self._mock_unsafe: 634 if name.startswith(('assert', 'assret')): 635 raise AttributeError("Attributes cannot start with 'assert' " 636 "or 'assret'") 637 638 result = self._mock_children.get(name) 639 if result is _deleted: 640 raise AttributeError(name) 641 elif result is None: 642 wraps = None 643 if self._mock_wraps is not None: 644 # XXXX should we get the attribute without triggering code 645 # execution? 646 wraps = getattr(self._mock_wraps, name) 647 648 result = self._get_child_mock( 649 parent=self, name=name, wraps=wraps, _new_name=name, 650 _new_parent=self 651 ) 652 self._mock_children[name] = result 653 654 elif isinstance(result, _SpecState): 655 result = create_autospec( 656 result.spec, result.spec_set, result.instance, 657 result.parent, result.name 658 ) 659 self._mock_children[name] = result 660 661 return result 662 663 664 def _extract_mock_name(self): 665 _name_list = [self._mock_new_name] 666 _parent = self._mock_new_parent 667 last = self 668 669 dot = '.' 670 if _name_list == ['()']: 671 dot = '' 672 673 while _parent is not None: 674 last = _parent 675 676 _name_list.append(_parent._mock_new_name + dot) 677 dot = '.' 678 if _parent._mock_new_name == '()': 679 dot = '' 680 681 _parent = _parent._mock_new_parent 682 683 _name_list = list(reversed(_name_list)) 684 _first = last._mock_name or 'mock' 685 if len(_name_list) > 1: 686 if _name_list[1] not in ('()', '().'): 687 _first += '.' 688 _name_list[0] = _first 689 return ''.join(_name_list) 690 691 def __repr__(self): 692 name = self._extract_mock_name() 693 694 name_string = '' 695 if name not in ('mock', 'mock.'): 696 name_string = ' name=%r' % name 697 698 spec_string = '' 699 if self._spec_class is not None: 700 spec_string = ' spec=%r' 701 if self._spec_set: 702 spec_string = ' spec_set=%r' 703 spec_string = spec_string % self._spec_class.__name__ 704 return "<%s%s%s id='%s'>" % ( 705 type(self).__name__, 706 name_string, 707 spec_string, 708 id(self) 709 ) 710 711 712 def __dir__(self): 713 """Filter the output of `dir(mock)` to only useful members.""" 714 if not FILTER_DIR: 715 return object.__dir__(self) 716 717 extras = self._mock_methods or [] 718 from_type = dir(type(self)) 719 from_dict = list(self.__dict__) 720 from_child_mocks = [ 721 m_name for m_name, m_value in self._mock_children.items() 722 if m_value is not _deleted] 723 724 from_type = [e for e in from_type if not e.startswith('_')] 725 from_dict = [e for e in from_dict if not e.startswith('_') or 726 _is_magic(e)] 727 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 728 729 730 def __setattr__(self, name, value): 731 if name in _allowed_names: 732 # property setters go through here 733 return object.__setattr__(self, name, value) 734 elif (self._spec_set and self._mock_methods is not None and 735 name not in self._mock_methods and 736 name not in self.__dict__): 737 raise AttributeError("Mock object has no attribute '%s'" % name) 738 elif name in _unsupported_magics: 739 msg = 'Attempting to set unsupported magic method %r.' % name 740 raise AttributeError(msg) 741 elif name in _all_magics: 742 if self._mock_methods is not None and name not in self._mock_methods: 743 raise AttributeError("Mock object has no attribute '%s'" % name) 744 745 if not _is_instance_mock(value): 746 setattr(type(self), name, _get_method(name, value)) 747 original = value 748 value = lambda *args, **kw: original(self, *args, **kw) 749 else: 750 # only set _new_name and not name so that mock_calls is tracked 751 # but not method calls 752 _check_and_set_parent(self, value, None, name) 753 setattr(type(self), name, value) 754 self._mock_children[name] = value 755 elif name == '__class__': 756 self._spec_class = value 757 return 758 else: 759 if _check_and_set_parent(self, value, name, name): 760 self._mock_children[name] = value 761 762 if self._mock_sealed and not hasattr(self, name): 763 mock_name = f'{self._extract_mock_name()}.{name}' 764 raise AttributeError(f'Cannot set {mock_name}') 765 766 return object.__setattr__(self, name, value) 767 768 769 def __delattr__(self, name): 770 if name in _all_magics and name in type(self).__dict__: 771 delattr(type(self), name) 772 if name not in self.__dict__: 773 # for magic methods that are still MagicProxy objects and 774 # not set on the instance itself 775 return 776 777 obj = self._mock_children.get(name, _missing) 778 if name in self.__dict__: 779 _safe_super(NonCallableMock, self).__delattr__(name) 780 elif obj is _deleted: 781 raise AttributeError(name) 782 if obj is not _missing: 783 del self._mock_children[name] 784 self._mock_children[name] = _deleted 785 786 787 def _format_mock_call_signature(self, args, kwargs): 788 name = self._mock_name or 'mock' 789 return _format_call_signature(name, args, kwargs) 790 791 792 def _format_mock_failure_message(self, args, kwargs, action='call'): 793 message = 'expected %s not found.\nExpected: %s\nActual: %s' 794 expected_string = self._format_mock_call_signature(args, kwargs) 795 call_args = self.call_args 796 actual_string = self._format_mock_call_signature(*call_args) 797 return message % (action, expected_string, actual_string) 798 799 800 def _get_call_signature_from_name(self, name): 801 """ 802 * If call objects are asserted against a method/function like obj.meth1 803 then there could be no name for the call object to lookup. Hence just 804 return the spec_signature of the method/function being asserted against. 805 * If the name is not empty then remove () and split by '.' to get 806 list of names to iterate through the children until a potential 807 match is found. A child mock is created only during attribute access 808 so if we get a _SpecState then no attributes of the spec were accessed 809 and can be safely exited. 810 """ 811 if not name: 812 return self._spec_signature 813 814 sig = None 815 names = name.replace('()', '').split('.') 816 children = self._mock_children 817 818 for name in names: 819 child = children.get(name) 820 if child is None or isinstance(child, _SpecState): 821 break 822 else: 823 # If an autospecced object is attached using attach_mock the 824 # child would be a function with mock object as attribute from 825 # which signature has to be derived. 826 child = _extract_mock(child) 827 children = child._mock_children 828 sig = child._spec_signature 829 830 return sig 831 832 833 def _call_matcher(self, _call): 834 """ 835 Given a call (or simply an (args, kwargs) tuple), return a 836 comparison key suitable for matching with other calls. 837 This is a best effort method which relies on the spec's signature, 838 if available, or falls back on the arguments themselves. 839 """ 840 841 if isinstance(_call, tuple) and len(_call) > 2: 842 sig = self._get_call_signature_from_name(_call[0]) 843 else: 844 sig = self._spec_signature 845 846 if sig is not None: 847 if len(_call) == 2: 848 name = '' 849 args, kwargs = _call 850 else: 851 name, args, kwargs = _call 852 try: 853 bound_call = sig.bind(*args, **kwargs) 854 return call(name, bound_call.args, bound_call.kwargs) 855 except TypeError as e: 856 return e.with_traceback(None) 857 else: 858 return _call 859 860 def assert_not_called(self): 861 """assert that the mock was never called. 862 """ 863 if self.call_count != 0: 864 msg = ("Expected '%s' to not have been called. Called %s times.%s" 865 % (self._mock_name or 'mock', 866 self.call_count, 867 self._calls_repr())) 868 raise AssertionError(msg) 869 870 def assert_called(self): 871 """assert that the mock was called at least once 872 """ 873 if self.call_count == 0: 874 msg = ("Expected '%s' to have been called." % 875 (self._mock_name or 'mock')) 876 raise AssertionError(msg) 877 878 def assert_called_once(self): 879 """assert that the mock was called only once. 880 """ 881 if not self.call_count == 1: 882 msg = ("Expected '%s' to have been called once. Called %s times.%s" 883 % (self._mock_name or 'mock', 884 self.call_count, 885 self._calls_repr())) 886 raise AssertionError(msg) 887 888 def assert_called_with(self, /, *args, **kwargs): 889 """assert that the last call was made with the specified arguments. 890 891 Raises an AssertionError if the args and keyword args passed in are 892 different to the last call to the mock.""" 893 if self.call_args is None: 894 expected = self._format_mock_call_signature(args, kwargs) 895 actual = 'not called.' 896 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 897 % (expected, actual)) 898 raise AssertionError(error_message) 899 900 def _error_message(): 901 msg = self._format_mock_failure_message(args, kwargs) 902 return msg 903 expected = self._call_matcher(_Call((args, kwargs), two=True)) 904 actual = self._call_matcher(self.call_args) 905 if actual != expected: 906 cause = expected if isinstance(expected, Exception) else None 907 raise AssertionError(_error_message()) from cause 908 909 910 def assert_called_once_with(self, /, *args, **kwargs): 911 """assert that the mock was called exactly once and that that call was 912 with the specified arguments.""" 913 if not self.call_count == 1: 914 msg = ("Expected '%s' to be called once. Called %s times.%s" 915 % (self._mock_name or 'mock', 916 self.call_count, 917 self._calls_repr())) 918 raise AssertionError(msg) 919 return self.assert_called_with(*args, **kwargs) 920 921 922 def assert_has_calls(self, calls, any_order=False): 923 """assert the mock has been called with the specified calls. 924 The `mock_calls` list is checked for the calls. 925 926 If `any_order` is False (the default) then the calls must be 927 sequential. There can be extra calls before or after the 928 specified calls. 929 930 If `any_order` is True then the calls can be in any order, but 931 they must all appear in `mock_calls`.""" 932 expected = [self._call_matcher(c) for c in calls] 933 cause = next((e for e in expected if isinstance(e, Exception)), None) 934 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 935 if not any_order: 936 if expected not in all_calls: 937 if cause is None: 938 problem = 'Calls not found.' 939 else: 940 problem = ('Error processing expected calls.\n' 941 'Errors: {}').format( 942 [e if isinstance(e, Exception) else None 943 for e in expected]) 944 raise AssertionError( 945 f'{problem}\n' 946 f'Expected: {_CallList(calls)}' 947 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 948 ) from cause 949 return 950 951 all_calls = list(all_calls) 952 953 not_found = [] 954 for kall in expected: 955 try: 956 all_calls.remove(kall) 957 except ValueError: 958 not_found.append(kall) 959 if not_found: 960 raise AssertionError( 961 '%r does not contain all of %r in its call list, ' 962 'found %r instead' % (self._mock_name or 'mock', 963 tuple(not_found), all_calls) 964 ) from cause 965 966 967 def assert_any_call(self, /, *args, **kwargs): 968 """assert the mock has been called with the specified arguments. 969 970 The assert passes if the mock has *ever* been called, unlike 971 `assert_called_with` and `assert_called_once_with` that only pass if 972 the call is the most recent one.""" 973 expected = self._call_matcher(_Call((args, kwargs), two=True)) 974 cause = expected if isinstance(expected, Exception) else None 975 actual = [self._call_matcher(c) for c in self.call_args_list] 976 if cause or expected not in _AnyComparer(actual): 977 expected_string = self._format_mock_call_signature(args, kwargs) 978 raise AssertionError( 979 '%s call not found' % expected_string 980 ) from cause 981 982 983 def _get_child_mock(self, /, **kw): 984 """Create the child mocks for attributes and return value. 985 By default child mocks will be the same type as the parent. 986 Subclasses of Mock may want to override this to customize the way 987 child mocks are made. 988 989 For non-callable mocks the callable variant will be used (rather than 990 any custom subclass).""" 991 _new_name = kw.get("_new_name") 992 if _new_name in self.__dict__['_spec_asyncs']: 993 return AsyncMock(**kw) 994 995 _type = type(self) 996 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 997 # Any asynchronous magic becomes an AsyncMock 998 klass = AsyncMock 999 elif issubclass(_type, AsyncMockMixin): 1000 if (_new_name in _all_sync_magics or 1001 self._mock_methods and _new_name in self._mock_methods): 1002 # Any synchronous method on AsyncMock becomes a MagicMock 1003 klass = MagicMock 1004 else: 1005 klass = AsyncMock 1006 elif not issubclass(_type, CallableMixin): 1007 if issubclass(_type, NonCallableMagicMock): 1008 klass = MagicMock 1009 elif issubclass(_type, NonCallableMock): 1010 klass = Mock 1011 else: 1012 klass = _type.__mro__[1] 1013 1014 if self._mock_sealed: 1015 attribute = "." + kw["name"] if "name" in kw else "()" 1016 mock_name = self._extract_mock_name() + attribute 1017 raise AttributeError(mock_name) 1018 1019 return klass(**kw) 1020 1021 1022 def _calls_repr(self, prefix="Calls"): 1023 """Renders self.mock_calls as a string. 1024 1025 Example: "\nCalls: [call(1), call(2)]." 1026 1027 If self.mock_calls is empty, an empty string is returned. The 1028 output will be truncated if very long. 1029 """ 1030 if not self.mock_calls: 1031 return "" 1032 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 1033 1034 1035_MOCK_SIG = inspect.signature(NonCallableMock.__init__) 1036 1037 1038class _AnyComparer(list): 1039 """A list which checks if it contains a call which may have an 1040 argument of ANY, flipping the components of item and self from 1041 their traditional locations so that ANY is guaranteed to be on 1042 the left.""" 1043 def __contains__(self, item): 1044 for _call in self: 1045 assert len(item) == len(_call) 1046 if all([ 1047 expected == actual 1048 for expected, actual in zip(item, _call) 1049 ]): 1050 return True 1051 return False 1052 1053 1054def _try_iter(obj): 1055 if obj is None: 1056 return obj 1057 if _is_exception(obj): 1058 return obj 1059 if _callable(obj): 1060 return obj 1061 try: 1062 return iter(obj) 1063 except TypeError: 1064 # XXXX backwards compatibility 1065 # but this will blow up on first call - so maybe we should fail early? 1066 return obj 1067 1068 1069class CallableMixin(Base): 1070 1071 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 1072 wraps=None, name=None, spec_set=None, parent=None, 1073 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 1074 self.__dict__['_mock_return_value'] = return_value 1075 _safe_super(CallableMixin, self).__init__( 1076 spec, wraps, name, spec_set, parent, 1077 _spec_state, _new_name, _new_parent, **kwargs 1078 ) 1079 1080 self.side_effect = side_effect 1081 1082 1083 def _mock_check_sig(self, /, *args, **kwargs): 1084 # stub method that can be replaced with one with a specific signature 1085 pass 1086 1087 1088 def __call__(self, /, *args, **kwargs): 1089 # can't use self in-case a function / method we are mocking uses self 1090 # in the signature 1091 self._mock_check_sig(*args, **kwargs) 1092 self._increment_mock_call(*args, **kwargs) 1093 return self._mock_call(*args, **kwargs) 1094 1095 1096 def _mock_call(self, /, *args, **kwargs): 1097 return self._execute_mock_call(*args, **kwargs) 1098 1099 def _increment_mock_call(self, /, *args, **kwargs): 1100 self.called = True 1101 self.call_count += 1 1102 1103 # handle call_args 1104 # needs to be set here so assertions on call arguments pass before 1105 # execution in the case of awaited calls 1106 _call = _Call((args, kwargs), two=True) 1107 self.call_args = _call 1108 self.call_args_list.append(_call) 1109 1110 # initial stuff for method_calls: 1111 do_method_calls = self._mock_parent is not None 1112 method_call_name = self._mock_name 1113 1114 # initial stuff for mock_calls: 1115 mock_call_name = self._mock_new_name 1116 is_a_call = mock_call_name == '()' 1117 self.mock_calls.append(_Call(('', args, kwargs))) 1118 1119 # follow up the chain of mocks: 1120 _new_parent = self._mock_new_parent 1121 while _new_parent is not None: 1122 1123 # handle method_calls: 1124 if do_method_calls: 1125 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 1126 do_method_calls = _new_parent._mock_parent is not None 1127 if do_method_calls: 1128 method_call_name = _new_parent._mock_name + '.' + method_call_name 1129 1130 # handle mock_calls: 1131 this_mock_call = _Call((mock_call_name, args, kwargs)) 1132 _new_parent.mock_calls.append(this_mock_call) 1133 1134 if _new_parent._mock_new_name: 1135 if is_a_call: 1136 dot = '' 1137 else: 1138 dot = '.' 1139 is_a_call = _new_parent._mock_new_name == '()' 1140 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 1141 1142 # follow the parental chain: 1143 _new_parent = _new_parent._mock_new_parent 1144 1145 def _execute_mock_call(self, /, *args, **kwargs): 1146 # separate from _increment_mock_call so that awaited functions are 1147 # executed separately from their call, also AsyncMock overrides this method 1148 1149 effect = self.side_effect 1150 if effect is not None: 1151 if _is_exception(effect): 1152 raise effect 1153 elif not _callable(effect): 1154 result = next(effect) 1155 if _is_exception(result): 1156 raise result 1157 else: 1158 result = effect(*args, **kwargs) 1159 1160 if result is not DEFAULT: 1161 return result 1162 1163 if self._mock_return_value is not DEFAULT: 1164 return self.return_value 1165 1166 if self._mock_wraps is not None: 1167 return self._mock_wraps(*args, **kwargs) 1168 1169 return self.return_value 1170 1171 1172 1173class Mock(CallableMixin, NonCallableMock): 1174 """ 1175 Create a new `Mock` object. `Mock` takes several optional arguments 1176 that specify the behaviour of the Mock object: 1177 1178 * `spec`: This can be either a list of strings or an existing object (a 1179 class or instance) that acts as the specification for the mock object. If 1180 you pass in an object then a list of strings is formed by calling dir on 1181 the object (excluding unsupported magic attributes and methods). Accessing 1182 any attribute not in this list will raise an `AttributeError`. 1183 1184 If `spec` is an object (rather than a list of strings) then 1185 `mock.__class__` returns the class of the spec object. This allows mocks 1186 to pass `isinstance` tests. 1187 1188 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 1189 or get an attribute on the mock that isn't on the object passed as 1190 `spec_set` will raise an `AttributeError`. 1191 1192 * `side_effect`: A function to be called whenever the Mock is called. See 1193 the `side_effect` attribute. Useful for raising exceptions or 1194 dynamically changing return values. The function is called with the same 1195 arguments as the mock, and unless it returns `DEFAULT`, the return 1196 value of this function is used as the return value. 1197 1198 If `side_effect` is an iterable then each call to the mock will return 1199 the next value from the iterable. If any of the members of the iterable 1200 are exceptions they will be raised instead of returned. 1201 1202 * `return_value`: The value returned when the mock is called. By default 1203 this is a new Mock (created on first access). See the 1204 `return_value` attribute. 1205 1206 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 1207 calling the Mock will pass the call through to the wrapped object 1208 (returning the real result). Attribute access on the mock will return a 1209 Mock object that wraps the corresponding attribute of the wrapped object 1210 (so attempting to access an attribute that doesn't exist will raise an 1211 `AttributeError`). 1212 1213 If the mock has an explicit `return_value` set then calls are not passed 1214 to the wrapped object and the `return_value` is returned instead. 1215 1216 * `name`: If the mock has a name then it will be used in the repr of the 1217 mock. This can be useful for debugging. The name is propagated to child 1218 mocks. 1219 1220 Mocks can also be called with arbitrary keyword arguments. These will be 1221 used to set attributes on the mock after it is created. 1222 """ 1223 1224 1225def _dot_lookup(thing, comp, import_path): 1226 try: 1227 return getattr(thing, comp) 1228 except AttributeError: 1229 __import__(import_path) 1230 return getattr(thing, comp) 1231 1232 1233def _importer(target): 1234 components = target.split('.') 1235 import_path = components.pop(0) 1236 thing = __import__(import_path) 1237 1238 for comp in components: 1239 import_path += ".%s" % comp 1240 thing = _dot_lookup(thing, comp, import_path) 1241 return thing 1242 1243 1244class _patch(object): 1245 1246 attribute_name = None 1247 _active_patches = [] 1248 1249 def __init__( 1250 self, getter, attribute, new, spec, create, 1251 spec_set, autospec, new_callable, kwargs 1252 ): 1253 if new_callable is not None: 1254 if new is not DEFAULT: 1255 raise ValueError( 1256 "Cannot use 'new' and 'new_callable' together" 1257 ) 1258 if autospec is not None: 1259 raise ValueError( 1260 "Cannot use 'autospec' and 'new_callable' together" 1261 ) 1262 1263 self.getter = getter 1264 self.attribute = attribute 1265 self.new = new 1266 self.new_callable = new_callable 1267 self.spec = spec 1268 self.create = create 1269 self.has_local = False 1270 self.spec_set = spec_set 1271 self.autospec = autospec 1272 self.kwargs = kwargs 1273 self.additional_patchers = [] 1274 1275 1276 def copy(self): 1277 patcher = _patch( 1278 self.getter, self.attribute, self.new, self.spec, 1279 self.create, self.spec_set, 1280 self.autospec, self.new_callable, self.kwargs 1281 ) 1282 patcher.attribute_name = self.attribute_name 1283 patcher.additional_patchers = [ 1284 p.copy() for p in self.additional_patchers 1285 ] 1286 return patcher 1287 1288 1289 def __call__(self, func): 1290 if isinstance(func, type): 1291 return self.decorate_class(func) 1292 if inspect.iscoroutinefunction(func): 1293 return self.decorate_async_callable(func) 1294 return self.decorate_callable(func) 1295 1296 1297 def decorate_class(self, klass): 1298 for attr in dir(klass): 1299 if not attr.startswith(patch.TEST_PREFIX): 1300 continue 1301 1302 attr_value = getattr(klass, attr) 1303 if not hasattr(attr_value, "__call__"): 1304 continue 1305 1306 patcher = self.copy() 1307 setattr(klass, attr, patcher(attr_value)) 1308 return klass 1309 1310 1311 @contextlib.contextmanager 1312 def decoration_helper(self, patched, args, keywargs): 1313 extra_args = [] 1314 with contextlib.ExitStack() as exit_stack: 1315 for patching in patched.patchings: 1316 arg = exit_stack.enter_context(patching) 1317 if patching.attribute_name is not None: 1318 keywargs.update(arg) 1319 elif patching.new is DEFAULT: 1320 extra_args.append(arg) 1321 1322 args += tuple(extra_args) 1323 yield (args, keywargs) 1324 1325 1326 def decorate_callable(self, func): 1327 # NB. Keep the method in sync with decorate_async_callable() 1328 if hasattr(func, 'patchings'): 1329 func.patchings.append(self) 1330 return func 1331 1332 @wraps(func) 1333 def patched(*args, **keywargs): 1334 with self.decoration_helper(patched, 1335 args, 1336 keywargs) as (newargs, newkeywargs): 1337 return func(*newargs, **newkeywargs) 1338 1339 patched.patchings = [self] 1340 return patched 1341 1342 1343 def decorate_async_callable(self, func): 1344 # NB. Keep the method in sync with decorate_callable() 1345 if hasattr(func, 'patchings'): 1346 func.patchings.append(self) 1347 return func 1348 1349 @wraps(func) 1350 async def patched(*args, **keywargs): 1351 with self.decoration_helper(patched, 1352 args, 1353 keywargs) as (newargs, newkeywargs): 1354 return await func(*newargs, **newkeywargs) 1355 1356 patched.patchings = [self] 1357 return patched 1358 1359 1360 def get_original(self): 1361 target = self.getter() 1362 name = self.attribute 1363 1364 original = DEFAULT 1365 local = False 1366 1367 try: 1368 original = target.__dict__[name] 1369 except (AttributeError, KeyError): 1370 original = getattr(target, name, DEFAULT) 1371 else: 1372 local = True 1373 1374 if name in _builtins and isinstance(target, ModuleType): 1375 self.create = True 1376 1377 if not self.create and original is DEFAULT: 1378 raise AttributeError( 1379 "%s does not have the attribute %r" % (target, name) 1380 ) 1381 return original, local 1382 1383 1384 def __enter__(self): 1385 """Perform the patch.""" 1386 new, spec, spec_set = self.new, self.spec, self.spec_set 1387 autospec, kwargs = self.autospec, self.kwargs 1388 new_callable = self.new_callable 1389 self.target = self.getter() 1390 1391 # normalise False to None 1392 if spec is False: 1393 spec = None 1394 if spec_set is False: 1395 spec_set = None 1396 if autospec is False: 1397 autospec = None 1398 1399 if spec is not None and autospec is not None: 1400 raise TypeError("Can't specify spec and autospec") 1401 if ((spec is not None or autospec is not None) and 1402 spec_set not in (True, None)): 1403 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 1404 1405 original, local = self.get_original() 1406 1407 if new is DEFAULT and autospec is None: 1408 inherit = False 1409 if spec is True: 1410 # set spec to the object we are replacing 1411 spec = original 1412 if spec_set is True: 1413 spec_set = original 1414 spec = None 1415 elif spec is not None: 1416 if spec_set is True: 1417 spec_set = spec 1418 spec = None 1419 elif spec_set is True: 1420 spec_set = original 1421 1422 if spec is not None or spec_set is not None: 1423 if original is DEFAULT: 1424 raise TypeError("Can't use 'spec' with create=True") 1425 if isinstance(original, type): 1426 # If we're patching out a class and there is a spec 1427 inherit = True 1428 if spec is None and _is_async_obj(original): 1429 Klass = AsyncMock 1430 else: 1431 Klass = MagicMock 1432 _kwargs = {} 1433 if new_callable is not None: 1434 Klass = new_callable 1435 elif spec is not None or spec_set is not None: 1436 this_spec = spec 1437 if spec_set is not None: 1438 this_spec = spec_set 1439 if _is_list(this_spec): 1440 not_callable = '__call__' not in this_spec 1441 else: 1442 not_callable = not callable(this_spec) 1443 if _is_async_obj(this_spec): 1444 Klass = AsyncMock 1445 elif not_callable: 1446 Klass = NonCallableMagicMock 1447 1448 if spec is not None: 1449 _kwargs['spec'] = spec 1450 if spec_set is not None: 1451 _kwargs['spec_set'] = spec_set 1452 1453 # add a name to mocks 1454 if (isinstance(Klass, type) and 1455 issubclass(Klass, NonCallableMock) and self.attribute): 1456 _kwargs['name'] = self.attribute 1457 1458 _kwargs.update(kwargs) 1459 new = Klass(**_kwargs) 1460 1461 if inherit and _is_instance_mock(new): 1462 # we can only tell if the instance should be callable if the 1463 # spec is not a list 1464 this_spec = spec 1465 if spec_set is not None: 1466 this_spec = spec_set 1467 if (not _is_list(this_spec) and not 1468 _instance_callable(this_spec)): 1469 Klass = NonCallableMagicMock 1470 1471 _kwargs.pop('name') 1472 new.return_value = Klass(_new_parent=new, _new_name='()', 1473 **_kwargs) 1474 elif autospec is not None: 1475 # spec is ignored, new *must* be default, spec_set is treated 1476 # as a boolean. Should we check spec is not None and that spec_set 1477 # is a bool? 1478 if new is not DEFAULT: 1479 raise TypeError( 1480 "autospec creates the mock for you. Can't specify " 1481 "autospec and new." 1482 ) 1483 if original is DEFAULT: 1484 raise TypeError("Can't use 'autospec' with create=True") 1485 spec_set = bool(spec_set) 1486 if autospec is True: 1487 autospec = original 1488 1489 new = create_autospec(autospec, spec_set=spec_set, 1490 _name=self.attribute, **kwargs) 1491 elif kwargs: 1492 # can't set keyword args when we aren't creating the mock 1493 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 1494 raise TypeError("Can't pass kwargs to a mock we aren't creating") 1495 1496 new_attr = new 1497 1498 self.temp_original = original 1499 self.is_local = local 1500 self._exit_stack = contextlib.ExitStack() 1501 try: 1502 setattr(self.target, self.attribute, new_attr) 1503 if self.attribute_name is not None: 1504 extra_args = {} 1505 if self.new is DEFAULT: 1506 extra_args[self.attribute_name] = new 1507 for patching in self.additional_patchers: 1508 arg = self._exit_stack.enter_context(patching) 1509 if patching.new is DEFAULT: 1510 extra_args.update(arg) 1511 return extra_args 1512 1513 return new 1514 except: 1515 if not self.__exit__(*sys.exc_info()): 1516 raise 1517 1518 def __exit__(self, *exc_info): 1519 """Undo the patch.""" 1520 if self.is_local and self.temp_original is not DEFAULT: 1521 setattr(self.target, self.attribute, self.temp_original) 1522 else: 1523 delattr(self.target, self.attribute) 1524 if not self.create and (not hasattr(self.target, self.attribute) or 1525 self.attribute in ('__doc__', '__module__', 1526 '__defaults__', '__annotations__', 1527 '__kwdefaults__')): 1528 # needed for proxy objects like django settings 1529 setattr(self.target, self.attribute, self.temp_original) 1530 1531 del self.temp_original 1532 del self.is_local 1533 del self.target 1534 exit_stack = self._exit_stack 1535 del self._exit_stack 1536 return exit_stack.__exit__(*exc_info) 1537 1538 1539 def start(self): 1540 """Activate a patch, returning any created mock.""" 1541 result = self.__enter__() 1542 self._active_patches.append(self) 1543 return result 1544 1545 1546 def stop(self): 1547 """Stop an active patch.""" 1548 try: 1549 self._active_patches.remove(self) 1550 except ValueError: 1551 # If the patch hasn't been started this will fail 1552 return None 1553 1554 return self.__exit__(None, None, None) 1555 1556 1557 1558def _get_target(target): 1559 try: 1560 target, attribute = target.rsplit('.', 1) 1561 except (TypeError, ValueError): 1562 raise TypeError("Need a valid target to patch. You supplied: %r" % 1563 (target,)) 1564 getter = lambda: _importer(target) 1565 return getter, attribute 1566 1567 1568def _patch_object( 1569 target, attribute, new=DEFAULT, spec=None, 1570 create=False, spec_set=None, autospec=None, 1571 new_callable=None, **kwargs 1572 ): 1573 """ 1574 patch the named member (`attribute`) on an object (`target`) with a mock 1575 object. 1576 1577 `patch.object` can be used as a decorator, class decorator or a context 1578 manager. Arguments `new`, `spec`, `create`, `spec_set`, 1579 `autospec` and `new_callable` have the same meaning as for `patch`. Like 1580 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 1581 the mock object it creates. 1582 1583 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 1584 for choosing which methods to wrap. 1585 """ 1586 if type(target) is str: 1587 raise TypeError( 1588 f"{target!r} must be the actual object to be patched, not a str" 1589 ) 1590 getter = lambda: target 1591 return _patch( 1592 getter, attribute, new, spec, create, 1593 spec_set, autospec, new_callable, kwargs 1594 ) 1595 1596 1597def _patch_multiple(target, spec=None, create=False, spec_set=None, 1598 autospec=None, new_callable=None, **kwargs): 1599 """Perform multiple patches in a single call. It takes the object to be 1600 patched (either as an object or a string to fetch the object by importing) 1601 and keyword arguments for the patches:: 1602 1603 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 1604 ... 1605 1606 Use `DEFAULT` as the value if you want `patch.multiple` to create 1607 mocks for you. In this case the created mocks are passed into a decorated 1608 function by keyword, and a dictionary is returned when `patch.multiple` is 1609 used as a context manager. 1610 1611 `patch.multiple` can be used as a decorator, class decorator or a context 1612 manager. The arguments `spec`, `spec_set`, `create`, 1613 `autospec` and `new_callable` have the same meaning as for `patch`. These 1614 arguments will be applied to *all* patches done by `patch.multiple`. 1615 1616 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 1617 for choosing which methods to wrap. 1618 """ 1619 if type(target) is str: 1620 getter = lambda: _importer(target) 1621 else: 1622 getter = lambda: target 1623 1624 if not kwargs: 1625 raise ValueError( 1626 'Must supply at least one keyword argument with patch.multiple' 1627 ) 1628 # need to wrap in a list for python 3, where items is a view 1629 items = list(kwargs.items()) 1630 attribute, new = items[0] 1631 patcher = _patch( 1632 getter, attribute, new, spec, create, spec_set, 1633 autospec, new_callable, {} 1634 ) 1635 patcher.attribute_name = attribute 1636 for attribute, new in items[1:]: 1637 this_patcher = _patch( 1638 getter, attribute, new, spec, create, spec_set, 1639 autospec, new_callable, {} 1640 ) 1641 this_patcher.attribute_name = attribute 1642 patcher.additional_patchers.append(this_patcher) 1643 return patcher 1644 1645 1646def patch( 1647 target, new=DEFAULT, spec=None, create=False, 1648 spec_set=None, autospec=None, new_callable=None, **kwargs 1649 ): 1650 """ 1651 `patch` acts as a function decorator, class decorator or a context 1652 manager. Inside the body of the function or with statement, the `target` 1653 is patched with a `new` object. When the function/with statement exits 1654 the patch is undone. 1655 1656 If `new` is omitted, then the target is replaced with an 1657 `AsyncMock if the patched object is an async function or a 1658 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 1659 omitted, the created mock is passed in as an extra argument to the 1660 decorated function. If `patch` is used as a context manager the created 1661 mock is returned by the context manager. 1662 1663 `target` should be a string in the form `'package.module.ClassName'`. The 1664 `target` is imported and the specified object replaced with the `new` 1665 object, so the `target` must be importable from the environment you are 1666 calling `patch` from. The target is imported when the decorated function 1667 is executed, not at decoration time. 1668 1669 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 1670 if patch is creating one for you. 1671 1672 In addition you can pass `spec=True` or `spec_set=True`, which causes 1673 patch to pass in the object being mocked as the spec/spec_set object. 1674 1675 `new_callable` allows you to specify a different class, or callable object, 1676 that will be called to create the `new` object. By default `AsyncMock` is 1677 used for async functions and `MagicMock` for the rest. 1678 1679 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 1680 then the mock will be created with a spec from the object being replaced. 1681 All attributes of the mock will also have the spec of the corresponding 1682 attribute of the object being replaced. Methods and functions being 1683 mocked will have their arguments checked and will raise a `TypeError` if 1684 they are called with the wrong signature. For mocks replacing a class, 1685 their return value (the 'instance') will have the same spec as the class. 1686 1687 Instead of `autospec=True` you can pass `autospec=some_object` to use an 1688 arbitrary object as the spec instead of the one being replaced. 1689 1690 By default `patch` will fail to replace attributes that don't exist. If 1691 you pass in `create=True`, and the attribute doesn't exist, patch will 1692 create the attribute for you when the patched function is called, and 1693 delete it again afterwards. This is useful for writing tests against 1694 attributes that your production code creates at runtime. It is off by 1695 default because it can be dangerous. With it switched on you can write 1696 passing tests against APIs that don't actually exist! 1697 1698 Patch can be used as a `TestCase` class decorator. It works by 1699 decorating each test method in the class. This reduces the boilerplate 1700 code when your test methods share a common patchings set. `patch` finds 1701 tests by looking for method names that start with `patch.TEST_PREFIX`. 1702 By default this is `test`, which matches the way `unittest` finds tests. 1703 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 1704 1705 Patch can be used as a context manager, with the with statement. Here the 1706 patching applies to the indented block after the with statement. If you 1707 use "as" then the patched object will be bound to the name after the 1708 "as"; very useful if `patch` is creating a mock object for you. 1709 1710 `patch` takes arbitrary keyword arguments. These will be passed to 1711 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 1712 otherwise or to `new_callable` if specified. 1713 1714 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 1715 available for alternate use-cases. 1716 """ 1717 getter, attribute = _get_target(target) 1718 return _patch( 1719 getter, attribute, new, spec, create, 1720 spec_set, autospec, new_callable, kwargs 1721 ) 1722 1723 1724class _patch_dict(object): 1725 """ 1726 Patch a dictionary, or dictionary like object, and restore the dictionary 1727 to its original state after the test. 1728 1729 `in_dict` can be a dictionary or a mapping like container. If it is a 1730 mapping then it must at least support getting, setting and deleting items 1731 plus iterating over keys. 1732 1733 `in_dict` can also be a string specifying the name of the dictionary, which 1734 will then be fetched by importing it. 1735 1736 `values` can be a dictionary of values to set in the dictionary. `values` 1737 can also be an iterable of `(key, value)` pairs. 1738 1739 If `clear` is True then the dictionary will be cleared before the new 1740 values are set. 1741 1742 `patch.dict` can also be called with arbitrary keyword arguments to set 1743 values in the dictionary:: 1744 1745 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 1746 ... 1747 1748 `patch.dict` can be used as a context manager, decorator or class 1749 decorator. When used as a class decorator `patch.dict` honours 1750 `patch.TEST_PREFIX` for choosing which methods to wrap. 1751 """ 1752 1753 def __init__(self, in_dict, values=(), clear=False, **kwargs): 1754 self.in_dict = in_dict 1755 # support any argument supported by dict(...) constructor 1756 self.values = dict(values) 1757 self.values.update(kwargs) 1758 self.clear = clear 1759 self._original = None 1760 1761 1762 def __call__(self, f): 1763 if isinstance(f, type): 1764 return self.decorate_class(f) 1765 @wraps(f) 1766 def _inner(*args, **kw): 1767 self._patch_dict() 1768 try: 1769 return f(*args, **kw) 1770 finally: 1771 self._unpatch_dict() 1772 1773 return _inner 1774 1775 1776 def decorate_class(self, klass): 1777 for attr in dir(klass): 1778 attr_value = getattr(klass, attr) 1779 if (attr.startswith(patch.TEST_PREFIX) and 1780 hasattr(attr_value, "__call__")): 1781 decorator = _patch_dict(self.in_dict, self.values, self.clear) 1782 decorated = decorator(attr_value) 1783 setattr(klass, attr, decorated) 1784 return klass 1785 1786 1787 def __enter__(self): 1788 """Patch the dict.""" 1789 self._patch_dict() 1790 return self.in_dict 1791 1792 1793 def _patch_dict(self): 1794 values = self.values 1795 if isinstance(self.in_dict, str): 1796 self.in_dict = _importer(self.in_dict) 1797 in_dict = self.in_dict 1798 clear = self.clear 1799 1800 try: 1801 original = in_dict.copy() 1802 except AttributeError: 1803 # dict like object with no copy method 1804 # must support iteration over keys 1805 original = {} 1806 for key in in_dict: 1807 original[key] = in_dict[key] 1808 self._original = original 1809 1810 if clear: 1811 _clear_dict(in_dict) 1812 1813 try: 1814 in_dict.update(values) 1815 except AttributeError: 1816 # dict like object with no update method 1817 for key in values: 1818 in_dict[key] = values[key] 1819 1820 1821 def _unpatch_dict(self): 1822 in_dict = self.in_dict 1823 original = self._original 1824 1825 _clear_dict(in_dict) 1826 1827 try: 1828 in_dict.update(original) 1829 except AttributeError: 1830 for key in original: 1831 in_dict[key] = original[key] 1832 1833 1834 def __exit__(self, *args): 1835 """Unpatch the dict.""" 1836 if self._original is not None: 1837 self._unpatch_dict() 1838 return False 1839 1840 1841 def start(self): 1842 """Activate a patch, returning any created mock.""" 1843 result = self.__enter__() 1844 _patch._active_patches.append(self) 1845 return result 1846 1847 1848 def stop(self): 1849 """Stop an active patch.""" 1850 try: 1851 _patch._active_patches.remove(self) 1852 except ValueError: 1853 # If the patch hasn't been started this will fail 1854 return None 1855 1856 return self.__exit__(None, None, None) 1857 1858 1859def _clear_dict(in_dict): 1860 try: 1861 in_dict.clear() 1862 except AttributeError: 1863 keys = list(in_dict) 1864 for key in keys: 1865 del in_dict[key] 1866 1867 1868def _patch_stopall(): 1869 """Stop all active patches. LIFO to unroll nested patches.""" 1870 for patch in reversed(_patch._active_patches): 1871 patch.stop() 1872 1873 1874patch.object = _patch_object 1875patch.dict = _patch_dict 1876patch.multiple = _patch_multiple 1877patch.stopall = _patch_stopall 1878patch.TEST_PREFIX = 'test' 1879 1880magic_methods = ( 1881 "lt le gt ge eq ne " 1882 "getitem setitem delitem " 1883 "len contains iter " 1884 "hash str sizeof " 1885 "enter exit " 1886 # we added divmod and rdivmod here instead of numerics 1887 # because there is no idivmod 1888 "divmod rdivmod neg pos abs invert " 1889 "complex int float index " 1890 "round trunc floor ceil " 1891 "bool next " 1892 "fspath " 1893 "aiter " 1894) 1895 1896numerics = ( 1897 "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv" 1898) 1899inplace = ' '.join('i%s' % n for n in numerics.split()) 1900right = ' '.join('r%s' % n for n in numerics.split()) 1901 1902# not including __prepare__, __instancecheck__, __subclasscheck__ 1903# (as they are metaclass methods) 1904# __del__ is not supported at all as it causes problems if it exists 1905 1906_non_defaults = { 1907 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 1908 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 1909 '__getstate__', '__setstate__', '__getformat__', '__setformat__', 1910 '__repr__', '__dir__', '__subclasses__', '__format__', 1911 '__getnewargs_ex__', 1912} 1913 1914 1915def _get_method(name, func): 1916 "Turns a callable object (like a mock) into a real function" 1917 def method(self, /, *args, **kw): 1918 return func(self, *args, **kw) 1919 method.__name__ = name 1920 return method 1921 1922 1923_magics = { 1924 '__%s__' % method for method in 1925 ' '.join([magic_methods, numerics, inplace, right]).split() 1926} 1927 1928# Magic methods used for async `with` statements 1929_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 1930# Magic methods that are only used with async calls but are synchronous functions themselves 1931_sync_async_magics = {"__aiter__"} 1932_async_magics = _async_method_magics | _sync_async_magics 1933 1934_all_sync_magics = _magics | _non_defaults 1935_all_magics = _all_sync_magics | _async_magics 1936 1937_unsupported_magics = { 1938 '__getattr__', '__setattr__', 1939 '__init__', '__new__', '__prepare__', 1940 '__instancecheck__', '__subclasscheck__', 1941 '__del__' 1942} 1943 1944_calculate_return_value = { 1945 '__hash__': lambda self: object.__hash__(self), 1946 '__str__': lambda self: object.__str__(self), 1947 '__sizeof__': lambda self: object.__sizeof__(self), 1948 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 1949} 1950 1951_return_values = { 1952 '__lt__': NotImplemented, 1953 '__gt__': NotImplemented, 1954 '__le__': NotImplemented, 1955 '__ge__': NotImplemented, 1956 '__int__': 1, 1957 '__contains__': False, 1958 '__len__': 0, 1959 '__exit__': False, 1960 '__complex__': 1j, 1961 '__float__': 1.0, 1962 '__bool__': True, 1963 '__index__': 1, 1964 '__aexit__': False, 1965} 1966 1967 1968def _get_eq(self): 1969 def __eq__(other): 1970 ret_val = self.__eq__._mock_return_value 1971 if ret_val is not DEFAULT: 1972 return ret_val 1973 if self is other: 1974 return True 1975 return NotImplemented 1976 return __eq__ 1977 1978def _get_ne(self): 1979 def __ne__(other): 1980 if self.__ne__._mock_return_value is not DEFAULT: 1981 return DEFAULT 1982 if self is other: 1983 return False 1984 return NotImplemented 1985 return __ne__ 1986 1987def _get_iter(self): 1988 def __iter__(): 1989 ret_val = self.__iter__._mock_return_value 1990 if ret_val is DEFAULT: 1991 return iter([]) 1992 # if ret_val was already an iterator, then calling iter on it should 1993 # return the iterator unchanged 1994 return iter(ret_val) 1995 return __iter__ 1996 1997def _get_async_iter(self): 1998 def __aiter__(): 1999 ret_val = self.__aiter__._mock_return_value 2000 if ret_val is DEFAULT: 2001 return _AsyncIterator(iter([])) 2002 return _AsyncIterator(iter(ret_val)) 2003 return __aiter__ 2004 2005_side_effect_methods = { 2006 '__eq__': _get_eq, 2007 '__ne__': _get_ne, 2008 '__iter__': _get_iter, 2009 '__aiter__': _get_async_iter 2010} 2011 2012 2013 2014def _set_return_value(mock, method, name): 2015 fixed = _return_values.get(name, DEFAULT) 2016 if fixed is not DEFAULT: 2017 method.return_value = fixed 2018 return 2019 2020 return_calculator = _calculate_return_value.get(name) 2021 if return_calculator is not None: 2022 return_value = return_calculator(mock) 2023 method.return_value = return_value 2024 return 2025 2026 side_effector = _side_effect_methods.get(name) 2027 if side_effector is not None: 2028 method.side_effect = side_effector(mock) 2029 2030 2031 2032class MagicMixin(Base): 2033 def __init__(self, /, *args, **kw): 2034 self._mock_set_magics() # make magic work for kwargs in init 2035 _safe_super(MagicMixin, self).__init__(*args, **kw) 2036 self._mock_set_magics() # fix magic broken by upper level init 2037 2038 2039 def _mock_set_magics(self): 2040 orig_magics = _magics | _async_method_magics 2041 these_magics = orig_magics 2042 2043 if getattr(self, "_mock_methods", None) is not None: 2044 these_magics = orig_magics.intersection(self._mock_methods) 2045 2046 remove_magics = set() 2047 remove_magics = orig_magics - these_magics 2048 2049 for entry in remove_magics: 2050 if entry in type(self).__dict__: 2051 # remove unneeded magic methods 2052 delattr(self, entry) 2053 2054 # don't overwrite existing attributes if called a second time 2055 these_magics = these_magics - set(type(self).__dict__) 2056 2057 _type = type(self) 2058 for entry in these_magics: 2059 setattr(_type, entry, MagicProxy(entry, self)) 2060 2061 2062 2063class NonCallableMagicMock(MagicMixin, NonCallableMock): 2064 """A version of `MagicMock` that isn't callable.""" 2065 def mock_add_spec(self, spec, spec_set=False): 2066 """Add a spec to a mock. `spec` can either be an object or a 2067 list of strings. Only attributes on the `spec` can be fetched as 2068 attributes from the mock. 2069 2070 If `spec_set` is True then only attributes on the spec can be set.""" 2071 self._mock_add_spec(spec, spec_set) 2072 self._mock_set_magics() 2073 2074 2075class AsyncMagicMixin(MagicMixin): 2076 def __init__(self, /, *args, **kw): 2077 self._mock_set_magics() # make magic work for kwargs in init 2078 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw) 2079 self._mock_set_magics() # fix magic broken by upper level init 2080 2081class MagicMock(MagicMixin, Mock): 2082 """ 2083 MagicMock is a subclass of Mock with default implementations 2084 of most of the magic methods. You can use MagicMock without having to 2085 configure the magic methods yourself. 2086 2087 If you use the `spec` or `spec_set` arguments then *only* magic 2088 methods that exist in the spec will be created. 2089 2090 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 2091 """ 2092 def mock_add_spec(self, spec, spec_set=False): 2093 """Add a spec to a mock. `spec` can either be an object or a 2094 list of strings. Only attributes on the `spec` can be fetched as 2095 attributes from the mock. 2096 2097 If `spec_set` is True then only attributes on the spec can be set.""" 2098 self._mock_add_spec(spec, spec_set) 2099 self._mock_set_magics() 2100 2101 2102 2103class MagicProxy(Base): 2104 def __init__(self, name, parent): 2105 self.name = name 2106 self.parent = parent 2107 2108 def create_mock(self): 2109 entry = self.name 2110 parent = self.parent 2111 m = parent._get_child_mock(name=entry, _new_name=entry, 2112 _new_parent=parent) 2113 setattr(parent, entry, m) 2114 _set_return_value(parent, m, entry) 2115 return m 2116 2117 def __get__(self, obj, _type=None): 2118 return self.create_mock() 2119 2120 2121class AsyncMockMixin(Base): 2122 await_count = _delegating_property('await_count') 2123 await_args = _delegating_property('await_args') 2124 await_args_list = _delegating_property('await_args_list') 2125 2126 def __init__(self, /, *args, **kwargs): 2127 super().__init__(*args, **kwargs) 2128 # iscoroutinefunction() checks _is_coroutine property to say if an 2129 # object is a coroutine. Without this check it looks to see if it is a 2130 # function/method, which in this case it is not (since it is an 2131 # AsyncMock). 2132 # It is set through __dict__ because when spec_set is True, this 2133 # attribute is likely undefined. 2134 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 2135 self.__dict__['_mock_await_count'] = 0 2136 self.__dict__['_mock_await_args'] = None 2137 self.__dict__['_mock_await_args_list'] = _CallList() 2138 code_mock = NonCallableMock(spec_set=CodeType) 2139 code_mock.co_flags = inspect.CO_COROUTINE 2140 self.__dict__['__code__'] = code_mock 2141 2142 async def _execute_mock_call(self, /, *args, **kwargs): 2143 # This is nearly just like super(), except for special handling 2144 # of coroutines 2145 2146 _call = _Call((args, kwargs), two=True) 2147 self.await_count += 1 2148 self.await_args = _call 2149 self.await_args_list.append(_call) 2150 2151 effect = self.side_effect 2152 if effect is not None: 2153 if _is_exception(effect): 2154 raise effect 2155 elif not _callable(effect): 2156 try: 2157 result = next(effect) 2158 except StopIteration: 2159 # It is impossible to propogate a StopIteration 2160 # through coroutines because of PEP 479 2161 raise StopAsyncIteration 2162 if _is_exception(result): 2163 raise result 2164 elif iscoroutinefunction(effect): 2165 result = await effect(*args, **kwargs) 2166 else: 2167 result = effect(*args, **kwargs) 2168 2169 if result is not DEFAULT: 2170 return result 2171 2172 if self._mock_return_value is not DEFAULT: 2173 return self.return_value 2174 2175 if self._mock_wraps is not None: 2176 if iscoroutinefunction(self._mock_wraps): 2177 return await self._mock_wraps(*args, **kwargs) 2178 return self._mock_wraps(*args, **kwargs) 2179 2180 return self.return_value 2181 2182 def assert_awaited(self): 2183 """ 2184 Assert that the mock was awaited at least once. 2185 """ 2186 if self.await_count == 0: 2187 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 2188 raise AssertionError(msg) 2189 2190 def assert_awaited_once(self): 2191 """ 2192 Assert that the mock was awaited exactly once. 2193 """ 2194 if not self.await_count == 1: 2195 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2196 f" Awaited {self.await_count} times.") 2197 raise AssertionError(msg) 2198 2199 def assert_awaited_with(self, /, *args, **kwargs): 2200 """ 2201 Assert that the last await was with the specified arguments. 2202 """ 2203 if self.await_args is None: 2204 expected = self._format_mock_call_signature(args, kwargs) 2205 raise AssertionError(f'Expected await: {expected}\nNot awaited') 2206 2207 def _error_message(): 2208 msg = self._format_mock_failure_message(args, kwargs, action='await') 2209 return msg 2210 2211 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2212 actual = self._call_matcher(self.await_args) 2213 if actual != expected: 2214 cause = expected if isinstance(expected, Exception) else None 2215 raise AssertionError(_error_message()) from cause 2216 2217 def assert_awaited_once_with(self, /, *args, **kwargs): 2218 """ 2219 Assert that the mock was awaited exactly once and with the specified 2220 arguments. 2221 """ 2222 if not self.await_count == 1: 2223 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2224 f" Awaited {self.await_count} times.") 2225 raise AssertionError(msg) 2226 return self.assert_awaited_with(*args, **kwargs) 2227 2228 def assert_any_await(self, /, *args, **kwargs): 2229 """ 2230 Assert the mock has ever been awaited with the specified arguments. 2231 """ 2232 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2233 cause = expected if isinstance(expected, Exception) else None 2234 actual = [self._call_matcher(c) for c in self.await_args_list] 2235 if cause or expected not in _AnyComparer(actual): 2236 expected_string = self._format_mock_call_signature(args, kwargs) 2237 raise AssertionError( 2238 '%s await not found' % expected_string 2239 ) from cause 2240 2241 def assert_has_awaits(self, calls, any_order=False): 2242 """ 2243 Assert the mock has been awaited with the specified calls. 2244 The :attr:`await_args_list` list is checked for the awaits. 2245 2246 If `any_order` is False (the default) then the awaits must be 2247 sequential. There can be extra calls before or after the 2248 specified awaits. 2249 2250 If `any_order` is True then the awaits can be in any order, but 2251 they must all appear in :attr:`await_args_list`. 2252 """ 2253 expected = [self._call_matcher(c) for c in calls] 2254 cause = next((e for e in expected if isinstance(e, Exception)), None) 2255 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 2256 if not any_order: 2257 if expected not in all_awaits: 2258 if cause is None: 2259 problem = 'Awaits not found.' 2260 else: 2261 problem = ('Error processing expected awaits.\n' 2262 'Errors: {}').format( 2263 [e if isinstance(e, Exception) else None 2264 for e in expected]) 2265 raise AssertionError( 2266 f'{problem}\n' 2267 f'Expected: {_CallList(calls)}\n' 2268 f'Actual: {self.await_args_list}' 2269 ) from cause 2270 return 2271 2272 all_awaits = list(all_awaits) 2273 2274 not_found = [] 2275 for kall in expected: 2276 try: 2277 all_awaits.remove(kall) 2278 except ValueError: 2279 not_found.append(kall) 2280 if not_found: 2281 raise AssertionError( 2282 '%r not all found in await list' % (tuple(not_found),) 2283 ) from cause 2284 2285 def assert_not_awaited(self): 2286 """ 2287 Assert that the mock was never awaited. 2288 """ 2289 if self.await_count != 0: 2290 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 2291 f" Awaited {self.await_count} times.") 2292 raise AssertionError(msg) 2293 2294 def reset_mock(self, /, *args, **kwargs): 2295 """ 2296 See :func:`.Mock.reset_mock()` 2297 """ 2298 super().reset_mock(*args, **kwargs) 2299 self.await_count = 0 2300 self.await_args = None 2301 self.await_args_list = _CallList() 2302 2303 2304class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 2305 """ 2306 Enhance :class:`Mock` with features allowing to mock 2307 an async function. 2308 2309 The :class:`AsyncMock` object will behave so the object is 2310 recognized as an async function, and the result of a call is an awaitable: 2311 2312 >>> mock = AsyncMock() 2313 >>> iscoroutinefunction(mock) 2314 True 2315 >>> inspect.isawaitable(mock()) 2316 True 2317 2318 2319 The result of ``mock()`` is an async function which will have the outcome 2320 of ``side_effect`` or ``return_value``: 2321 2322 - if ``side_effect`` is a function, the async function will return the 2323 result of that function, 2324 - if ``side_effect`` is an exception, the async function will raise the 2325 exception, 2326 - if ``side_effect`` is an iterable, the async function will return the 2327 next value of the iterable, however, if the sequence of result is 2328 exhausted, ``StopIteration`` is raised immediately, 2329 - if ``side_effect`` is not defined, the async function will return the 2330 value defined by ``return_value``, hence, by default, the async function 2331 returns a new :class:`AsyncMock` object. 2332 2333 If the outcome of ``side_effect`` or ``return_value`` is an async function, 2334 the mock async function obtained when the mock object is called will be this 2335 async function itself (and not an async function returning an async 2336 function). 2337 2338 The test author can also specify a wrapped object with ``wraps``. In this 2339 case, the :class:`Mock` object behavior is the same as with an 2340 :class:`.Mock` object: the wrapped object may have methods 2341 defined as async function functions. 2342 2343 Based on Martin Richard's asynctest project. 2344 """ 2345 2346 2347class _ANY(object): 2348 "A helper object that compares equal to everything." 2349 2350 def __eq__(self, other): 2351 return True 2352 2353 def __ne__(self, other): 2354 return False 2355 2356 def __repr__(self): 2357 return '<ANY>' 2358 2359ANY = _ANY() 2360 2361 2362 2363def _format_call_signature(name, args, kwargs): 2364 message = '%s(%%s)' % name 2365 formatted_args = '' 2366 args_string = ', '.join([repr(arg) for arg in args]) 2367 kwargs_string = ', '.join([ 2368 '%s=%r' % (key, value) for key, value in kwargs.items() 2369 ]) 2370 if args_string: 2371 formatted_args = args_string 2372 if kwargs_string: 2373 if formatted_args: 2374 formatted_args += ', ' 2375 formatted_args += kwargs_string 2376 2377 return message % formatted_args 2378 2379 2380 2381class _Call(tuple): 2382 """ 2383 A tuple for holding the results of a call to a mock, either in the form 2384 `(args, kwargs)` or `(name, args, kwargs)`. 2385 2386 If args or kwargs are empty then a call tuple will compare equal to 2387 a tuple without those values. This makes comparisons less verbose:: 2388 2389 _Call(('name', (), {})) == ('name',) 2390 _Call(('name', (1,), {})) == ('name', (1,)) 2391 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 2392 2393 The `_Call` object provides a useful shortcut for comparing with call:: 2394 2395 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 2396 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 2397 2398 If the _Call has no name then it will match any name. 2399 """ 2400 def __new__(cls, value=(), name='', parent=None, two=False, 2401 from_kall=True): 2402 args = () 2403 kwargs = {} 2404 _len = len(value) 2405 if _len == 3: 2406 name, args, kwargs = value 2407 elif _len == 2: 2408 first, second = value 2409 if isinstance(first, str): 2410 name = first 2411 if isinstance(second, tuple): 2412 args = second 2413 else: 2414 kwargs = second 2415 else: 2416 args, kwargs = first, second 2417 elif _len == 1: 2418 value, = value 2419 if isinstance(value, str): 2420 name = value 2421 elif isinstance(value, tuple): 2422 args = value 2423 else: 2424 kwargs = value 2425 2426 if two: 2427 return tuple.__new__(cls, (args, kwargs)) 2428 2429 return tuple.__new__(cls, (name, args, kwargs)) 2430 2431 2432 def __init__(self, value=(), name=None, parent=None, two=False, 2433 from_kall=True): 2434 self._mock_name = name 2435 self._mock_parent = parent 2436 self._mock_from_kall = from_kall 2437 2438 2439 def __eq__(self, other): 2440 try: 2441 len_other = len(other) 2442 except TypeError: 2443 return NotImplemented 2444 2445 self_name = '' 2446 if len(self) == 2: 2447 self_args, self_kwargs = self 2448 else: 2449 self_name, self_args, self_kwargs = self 2450 2451 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 2452 and self._mock_parent != other._mock_parent): 2453 return False 2454 2455 other_name = '' 2456 if len_other == 0: 2457 other_args, other_kwargs = (), {} 2458 elif len_other == 3: 2459 other_name, other_args, other_kwargs = other 2460 elif len_other == 1: 2461 value, = other 2462 if isinstance(value, tuple): 2463 other_args = value 2464 other_kwargs = {} 2465 elif isinstance(value, str): 2466 other_name = value 2467 other_args, other_kwargs = (), {} 2468 else: 2469 other_args = () 2470 other_kwargs = value 2471 elif len_other == 2: 2472 # could be (name, args) or (name, kwargs) or (args, kwargs) 2473 first, second = other 2474 if isinstance(first, str): 2475 other_name = first 2476 if isinstance(second, tuple): 2477 other_args, other_kwargs = second, {} 2478 else: 2479 other_args, other_kwargs = (), second 2480 else: 2481 other_args, other_kwargs = first, second 2482 else: 2483 return False 2484 2485 if self_name and other_name != self_name: 2486 return False 2487 2488 # this order is important for ANY to work! 2489 return (other_args, other_kwargs) == (self_args, self_kwargs) 2490 2491 2492 __ne__ = object.__ne__ 2493 2494 2495 def __call__(self, /, *args, **kwargs): 2496 if self._mock_name is None: 2497 return _Call(('', args, kwargs), name='()') 2498 2499 name = self._mock_name + '()' 2500 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 2501 2502 2503 def __getattr__(self, attr): 2504 if self._mock_name is None: 2505 return _Call(name=attr, from_kall=False) 2506 name = '%s.%s' % (self._mock_name, attr) 2507 return _Call(name=name, parent=self, from_kall=False) 2508 2509 2510 def __getattribute__(self, attr): 2511 if attr in tuple.__dict__: 2512 raise AttributeError 2513 return tuple.__getattribute__(self, attr) 2514 2515 2516 def _get_call_arguments(self): 2517 if len(self) == 2: 2518 args, kwargs = self 2519 else: 2520 name, args, kwargs = self 2521 2522 return args, kwargs 2523 2524 @property 2525 def args(self): 2526 return self._get_call_arguments()[0] 2527 2528 @property 2529 def kwargs(self): 2530 return self._get_call_arguments()[1] 2531 2532 def __repr__(self): 2533 if not self._mock_from_kall: 2534 name = self._mock_name or 'call' 2535 if name.startswith('()'): 2536 name = 'call%s' % name 2537 return name 2538 2539 if len(self) == 2: 2540 name = 'call' 2541 args, kwargs = self 2542 else: 2543 name, args, kwargs = self 2544 if not name: 2545 name = 'call' 2546 elif not name.startswith('()'): 2547 name = 'call.%s' % name 2548 else: 2549 name = 'call%s' % name 2550 return _format_call_signature(name, args, kwargs) 2551 2552 2553 def call_list(self): 2554 """For a call object that represents multiple calls, `call_list` 2555 returns a list of all the intermediate calls as well as the 2556 final call.""" 2557 vals = [] 2558 thing = self 2559 while thing is not None: 2560 if thing._mock_from_kall: 2561 vals.append(thing) 2562 thing = thing._mock_parent 2563 return _CallList(reversed(vals)) 2564 2565 2566call = _Call(from_kall=False) 2567 2568 2569def create_autospec(spec, spec_set=False, instance=False, _parent=None, 2570 _name=None, **kwargs): 2571 """Create a mock object using another object as a spec. Attributes on the 2572 mock will use the corresponding attribute on the `spec` object as their 2573 spec. 2574 2575 Functions or methods being mocked will have their arguments checked 2576 to check that they are called with the correct signature. 2577 2578 If `spec_set` is True then attempting to set attributes that don't exist 2579 on the spec object will raise an `AttributeError`. 2580 2581 If a class is used as a spec then the return value of the mock (the 2582 instance of the class) will have the same spec. You can use a class as the 2583 spec for an instance object by passing `instance=True`. The returned mock 2584 will only be callable if instances of the mock are callable. 2585 2586 `create_autospec` also takes arbitrary keyword arguments that are passed to 2587 the constructor of the created mock.""" 2588 if _is_list(spec): 2589 # can't pass a list instance to the mock constructor as it will be 2590 # interpreted as a list of strings 2591 spec = type(spec) 2592 2593 is_type = isinstance(spec, type) 2594 is_async_func = _is_async_func(spec) 2595 _kwargs = {'spec': spec} 2596 if spec_set: 2597 _kwargs = {'spec_set': spec} 2598 elif spec is None: 2599 # None we mock with a normal mock without a spec 2600 _kwargs = {} 2601 if _kwargs and instance: 2602 _kwargs['_spec_as_instance'] = True 2603 2604 _kwargs.update(kwargs) 2605 2606 Klass = MagicMock 2607 if inspect.isdatadescriptor(spec): 2608 # descriptors don't have a spec 2609 # because we don't know what type they return 2610 _kwargs = {} 2611 elif is_async_func: 2612 if instance: 2613 raise RuntimeError("Instance can not be True when create_autospec " 2614 "is mocking an async function") 2615 Klass = AsyncMock 2616 elif not _callable(spec): 2617 Klass = NonCallableMagicMock 2618 elif is_type and instance and not _instance_callable(spec): 2619 Klass = NonCallableMagicMock 2620 2621 _name = _kwargs.pop('name', _name) 2622 2623 _new_name = _name 2624 if _parent is None: 2625 # for a top level object no _new_name should be set 2626 _new_name = '' 2627 2628 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 2629 name=_name, **_kwargs) 2630 2631 if isinstance(spec, FunctionTypes): 2632 # should only happen at the top level because we don't 2633 # recurse for functions 2634 mock = _set_signature(mock, spec) 2635 if is_async_func: 2636 _setup_async_mock(mock) 2637 else: 2638 _check_signature(spec, mock, is_type, instance) 2639 2640 if _parent is not None and not instance: 2641 _parent._mock_children[_name] = mock 2642 2643 if is_type and not instance and 'return_value' not in kwargs: 2644 mock.return_value = create_autospec(spec, spec_set, instance=True, 2645 _name='()', _parent=mock) 2646 2647 for entry in dir(spec): 2648 if _is_magic(entry): 2649 # MagicMock already does the useful magic methods for us 2650 continue 2651 2652 # XXXX do we need a better way of getting attributes without 2653 # triggering code execution (?) Probably not - we need the actual 2654 # object to mock it so we would rather trigger a property than mock 2655 # the property descriptor. Likewise we want to mock out dynamically 2656 # provided attributes. 2657 # XXXX what about attributes that raise exceptions other than 2658 # AttributeError on being fetched? 2659 # we could be resilient against it, or catch and propagate the 2660 # exception when the attribute is fetched from the mock 2661 try: 2662 original = getattr(spec, entry) 2663 except AttributeError: 2664 continue 2665 2666 kwargs = {'spec': original} 2667 if spec_set: 2668 kwargs = {'spec_set': original} 2669 2670 if not isinstance(original, FunctionTypes): 2671 new = _SpecState(original, spec_set, mock, entry, instance) 2672 mock._mock_children[entry] = new 2673 else: 2674 parent = mock 2675 if isinstance(spec, FunctionTypes): 2676 parent = mock.mock 2677 2678 skipfirst = _must_skip(spec, entry, is_type) 2679 kwargs['_eat_self'] = skipfirst 2680 if iscoroutinefunction(original): 2681 child_klass = AsyncMock 2682 else: 2683 child_klass = MagicMock 2684 new = child_klass(parent=parent, name=entry, _new_name=entry, 2685 _new_parent=parent, 2686 **kwargs) 2687 mock._mock_children[entry] = new 2688 _check_signature(original, new, skipfirst=skipfirst) 2689 2690 # so functions created with _set_signature become instance attributes, 2691 # *plus* their underlying mock exists in _mock_children of the parent 2692 # mock. Adding to _mock_children may be unnecessary where we are also 2693 # setting as an instance attribute? 2694 if isinstance(new, FunctionTypes): 2695 setattr(mock, entry, new) 2696 2697 return mock 2698 2699 2700def _must_skip(spec, entry, is_type): 2701 """ 2702 Return whether we should skip the first argument on spec's `entry` 2703 attribute. 2704 """ 2705 if not isinstance(spec, type): 2706 if entry in getattr(spec, '__dict__', {}): 2707 # instance attribute - shouldn't skip 2708 return False 2709 spec = spec.__class__ 2710 2711 for klass in spec.__mro__: 2712 result = klass.__dict__.get(entry, DEFAULT) 2713 if result is DEFAULT: 2714 continue 2715 if isinstance(result, (staticmethod, classmethod)): 2716 return False 2717 elif isinstance(result, FunctionTypes): 2718 # Normal method => skip if looked up on type 2719 # (if looked up on instance, self is already skipped) 2720 return is_type 2721 else: 2722 return False 2723 2724 # function is a dynamically provided attribute 2725 return is_type 2726 2727 2728class _SpecState(object): 2729 2730 def __init__(self, spec, spec_set=False, parent=None, 2731 name=None, ids=None, instance=False): 2732 self.spec = spec 2733 self.ids = ids 2734 self.spec_set = spec_set 2735 self.parent = parent 2736 self.instance = instance 2737 self.name = name 2738 2739 2740FunctionTypes = ( 2741 # python function 2742 type(create_autospec), 2743 # instance method 2744 type(ANY.__eq__), 2745) 2746 2747 2748file_spec = None 2749 2750 2751def _to_stream(read_data): 2752 if isinstance(read_data, bytes): 2753 return io.BytesIO(read_data) 2754 else: 2755 return io.StringIO(read_data) 2756 2757 2758def mock_open(mock=None, read_data=''): 2759 """ 2760 A helper function to create a mock to replace the use of `open`. It works 2761 for `open` called directly or used as a context manager. 2762 2763 The `mock` argument is the mock object to configure. If `None` (the 2764 default) then a `MagicMock` will be created for you, with the API limited 2765 to methods or attributes available on standard file handles. 2766 2767 `read_data` is a string for the `read`, `readline` and `readlines` of the 2768 file handle to return. This is an empty string by default. 2769 """ 2770 _read_data = _to_stream(read_data) 2771 _state = [_read_data, None] 2772 2773 def _readlines_side_effect(*args, **kwargs): 2774 if handle.readlines.return_value is not None: 2775 return handle.readlines.return_value 2776 return _state[0].readlines(*args, **kwargs) 2777 2778 def _read_side_effect(*args, **kwargs): 2779 if handle.read.return_value is not None: 2780 return handle.read.return_value 2781 return _state[0].read(*args, **kwargs) 2782 2783 def _readline_side_effect(*args, **kwargs): 2784 yield from _iter_side_effect() 2785 while True: 2786 yield _state[0].readline(*args, **kwargs) 2787 2788 def _iter_side_effect(): 2789 if handle.readline.return_value is not None: 2790 while True: 2791 yield handle.readline.return_value 2792 for line in _state[0]: 2793 yield line 2794 2795 def _next_side_effect(): 2796 if handle.readline.return_value is not None: 2797 return handle.readline.return_value 2798 return next(_state[0]) 2799 2800 global file_spec 2801 if file_spec is None: 2802 import _io 2803 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 2804 2805 if mock is None: 2806 mock = MagicMock(name='open', spec=open) 2807 2808 handle = MagicMock(spec=file_spec) 2809 handle.__enter__.return_value = handle 2810 2811 handle.write.return_value = None 2812 handle.read.return_value = None 2813 handle.readline.return_value = None 2814 handle.readlines.return_value = None 2815 2816 handle.read.side_effect = _read_side_effect 2817 _state[1] = _readline_side_effect() 2818 handle.readline.side_effect = _state[1] 2819 handle.readlines.side_effect = _readlines_side_effect 2820 handle.__iter__.side_effect = _iter_side_effect 2821 handle.__next__.side_effect = _next_side_effect 2822 2823 def reset_data(*args, **kwargs): 2824 _state[0] = _to_stream(read_data) 2825 if handle.readline.side_effect == _state[1]: 2826 # Only reset the side effect if the user hasn't overridden it. 2827 _state[1] = _readline_side_effect() 2828 handle.readline.side_effect = _state[1] 2829 return DEFAULT 2830 2831 mock.side_effect = reset_data 2832 mock.return_value = handle 2833 return mock 2834 2835 2836class PropertyMock(Mock): 2837 """ 2838 A mock intended to be used as a property, or other descriptor, on a class. 2839 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 2840 a return value when it is fetched. 2841 2842 Fetching a `PropertyMock` instance from an object calls the mock, with 2843 no args. Setting it calls the mock with the value being set. 2844 """ 2845 def _get_child_mock(self, /, **kwargs): 2846 return MagicMock(**kwargs) 2847 2848 def __get__(self, obj, obj_type=None): 2849 return self() 2850 def __set__(self, obj, val): 2851 self(val) 2852 2853 2854def seal(mock): 2855 """Disable the automatic generation of child mocks. 2856 2857 Given an input Mock, seals it to ensure no further mocks will be generated 2858 when accessing an attribute that was not already defined. 2859 2860 The operation recursively seals the mock passed in, meaning that 2861 the mock itself, any mocks generated by accessing one of its attributes, 2862 and all assigned mocks without a name or spec will be sealed. 2863 """ 2864 mock._mock_sealed = True 2865 for attr in dir(mock): 2866 try: 2867 m = getattr(mock, attr) 2868 except AttributeError: 2869 continue 2870 if not isinstance(m, NonCallableMock): 2871 continue 2872 if m._mock_new_parent is mock: 2873 seal(m) 2874 2875 2876class _AsyncIterator: 2877 """ 2878 Wraps an iterator in an asynchronous iterator. 2879 """ 2880 def __init__(self, iterator): 2881 self.iterator = iterator 2882 code_mock = NonCallableMock(spec_set=CodeType) 2883 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 2884 self.__dict__['__code__'] = code_mock 2885 2886 async def __anext__(self): 2887 try: 2888 return next(self.iterator) 2889 except StopIteration: 2890 pass 2891 raise StopAsyncIteration 2892