1# mock.py 2# Test tools for mocking and patching. 3# Maintained by Michael Foord 4# Backport for other versions of Python available from 5# https://pypi.org/project/mock 6 7__all__ = ( 8 'Mock', 9 'MagicMock', 10 'patch', 11 'sentinel', 12 'DEFAULT', 13 'ANY', 14 'call', 15 'create_autospec', 16 'AsyncMock', 17 'FILTER_DIR', 18 'NonCallableMock', 19 'NonCallableMagicMock', 20 'mock_open', 21 'PropertyMock', 22 'seal', 23) 24 25 26__version__ = '1.0' 27 28import asyncio 29import contextlib 30import io 31import inspect 32import pprint 33import sys 34import builtins 35from types import CodeType, ModuleType, MethodType 36from unittest.util import safe_repr 37from functools import wraps, partial 38 39 40_builtins = {name for name in dir(builtins) if not name.startswith('_')} 41 42FILTER_DIR = True 43 44# Workaround for issue #12370 45# Without this, the __class__ properties wouldn't be set correctly 46_safe_super = super 47 48def _is_async_obj(obj): 49 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 50 return False 51 return asyncio.iscoroutinefunction(obj) or inspect.isawaitable(obj) 52 53 54def _is_async_func(func): 55 if getattr(func, '__code__', None): 56 return asyncio.iscoroutinefunction(func) 57 else: 58 return False 59 60 61def _is_instance_mock(obj): 62 # can't use isinstance on Mock objects because they override __class__ 63 # The base class for all mocks is NonCallableMock 64 return issubclass(type(obj), NonCallableMock) 65 66 67def _is_exception(obj): 68 return ( 69 isinstance(obj, BaseException) or 70 isinstance(obj, type) and issubclass(obj, BaseException) 71 ) 72 73 74def _extract_mock(obj): 75 # Autospecced functions will return a FunctionType with "mock" attribute 76 # which is the actual mock object that needs to be used. 77 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 78 return obj.mock 79 else: 80 return obj 81 82 83def _get_signature_object(func, as_instance, eat_self): 84 """ 85 Given an arbitrary, possibly callable object, try to create a suitable 86 signature object. 87 Return a (reduced func, signature) tuple, or None. 88 """ 89 if isinstance(func, type) and not as_instance: 90 # If it's a type and should be modelled as a type, use __init__. 91 func = func.__init__ 92 # Skip the `self` argument in __init__ 93 eat_self = True 94 elif not isinstance(func, FunctionTypes): 95 # If we really want to model an instance of the passed type, 96 # __call__ should be looked up, not __init__. 97 try: 98 func = func.__call__ 99 except AttributeError: 100 return None 101 if eat_self: 102 sig_func = partial(func, None) 103 else: 104 sig_func = func 105 try: 106 return func, inspect.signature(sig_func) 107 except ValueError: 108 # Certain callable types are not supported by inspect.signature() 109 return None 110 111 112def _check_signature(func, mock, skipfirst, instance=False): 113 sig = _get_signature_object(func, instance, skipfirst) 114 if sig is None: 115 return 116 func, sig = sig 117 def checksig(self, /, *args, **kwargs): 118 sig.bind(*args, **kwargs) 119 _copy_func_details(func, checksig) 120 type(mock)._mock_check_sig = checksig 121 type(mock).__signature__ = sig 122 123 124def _copy_func_details(func, funcopy): 125 # we explicitly don't copy func.__dict__ into this copy as it would 126 # expose original attributes that should be mocked 127 for attribute in ( 128 '__name__', '__doc__', '__text_signature__', 129 '__module__', '__defaults__', '__kwdefaults__', 130 ): 131 try: 132 setattr(funcopy, attribute, getattr(func, attribute)) 133 except AttributeError: 134 pass 135 136 137def _callable(obj): 138 if isinstance(obj, type): 139 return True 140 if isinstance(obj, (staticmethod, classmethod, MethodType)): 141 return _callable(obj.__func__) 142 if getattr(obj, '__call__', None) is not None: 143 return True 144 return False 145 146 147def _is_list(obj): 148 # checks for list or tuples 149 # XXXX badly named! 150 return type(obj) in (list, tuple) 151 152 153def _instance_callable(obj): 154 """Given an object, return True if the object is callable. 155 For classes, return True if instances would be callable.""" 156 if not isinstance(obj, type): 157 # already an instance 158 return getattr(obj, '__call__', None) is not None 159 160 # *could* be broken by a class overriding __mro__ or __dict__ via 161 # a metaclass 162 for base in (obj,) + obj.__mro__: 163 if base.__dict__.get('__call__') is not None: 164 return True 165 return False 166 167 168def _set_signature(mock, original, instance=False): 169 # creates a function with signature (*args, **kwargs) that delegates to a 170 # mock. It still does signature checking by calling a lambda with the same 171 # signature as the original. 172 173 skipfirst = isinstance(original, type) 174 result = _get_signature_object(original, instance, skipfirst) 175 if result is None: 176 return mock 177 func, sig = result 178 def checksig(*args, **kwargs): 179 sig.bind(*args, **kwargs) 180 _copy_func_details(func, checksig) 181 182 name = original.__name__ 183 if not name.isidentifier(): 184 name = 'funcopy' 185 context = {'_checksig_': checksig, 'mock': mock} 186 src = """def %s(*args, **kwargs): 187 _checksig_(*args, **kwargs) 188 return mock(*args, **kwargs)""" % name 189 exec (src, context) 190 funcopy = context[name] 191 _setup_func(funcopy, mock, sig) 192 return funcopy 193 194 195def _setup_func(funcopy, mock, sig): 196 funcopy.mock = mock 197 198 def assert_called_with(*args, **kwargs): 199 return mock.assert_called_with(*args, **kwargs) 200 def assert_called(*args, **kwargs): 201 return mock.assert_called(*args, **kwargs) 202 def assert_not_called(*args, **kwargs): 203 return mock.assert_not_called(*args, **kwargs) 204 def assert_called_once(*args, **kwargs): 205 return mock.assert_called_once(*args, **kwargs) 206 def assert_called_once_with(*args, **kwargs): 207 return mock.assert_called_once_with(*args, **kwargs) 208 def assert_has_calls(*args, **kwargs): 209 return mock.assert_has_calls(*args, **kwargs) 210 def assert_any_call(*args, **kwargs): 211 return mock.assert_any_call(*args, **kwargs) 212 def reset_mock(): 213 funcopy.method_calls = _CallList() 214 funcopy.mock_calls = _CallList() 215 mock.reset_mock() 216 ret = funcopy.return_value 217 if _is_instance_mock(ret) and not ret is mock: 218 ret.reset_mock() 219 220 funcopy.called = False 221 funcopy.call_count = 0 222 funcopy.call_args = None 223 funcopy.call_args_list = _CallList() 224 funcopy.method_calls = _CallList() 225 funcopy.mock_calls = _CallList() 226 227 funcopy.return_value = mock.return_value 228 funcopy.side_effect = mock.side_effect 229 funcopy._mock_children = mock._mock_children 230 231 funcopy.assert_called_with = assert_called_with 232 funcopy.assert_called_once_with = assert_called_once_with 233 funcopy.assert_has_calls = assert_has_calls 234 funcopy.assert_any_call = assert_any_call 235 funcopy.reset_mock = reset_mock 236 funcopy.assert_called = assert_called 237 funcopy.assert_not_called = assert_not_called 238 funcopy.assert_called_once = assert_called_once 239 funcopy.__signature__ = sig 240 241 mock._mock_delegate = funcopy 242 243 244def _setup_async_mock(mock): 245 mock._is_coroutine = asyncio.coroutines._is_coroutine 246 mock.await_count = 0 247 mock.await_args = None 248 mock.await_args_list = _CallList() 249 250 # Mock is not configured yet so the attributes are set 251 # to a function and then the corresponding mock helper function 252 # is called when the helper is accessed similar to _setup_func. 253 def wrapper(attr, /, *args, **kwargs): 254 return getattr(mock.mock, attr)(*args, **kwargs) 255 256 for attribute in ('assert_awaited', 257 'assert_awaited_once', 258 'assert_awaited_with', 259 'assert_awaited_once_with', 260 'assert_any_await', 261 'assert_has_awaits', 262 'assert_not_awaited'): 263 264 # setattr(mock, attribute, wrapper) causes late binding 265 # hence attribute will always be the last value in the loop 266 # Use partial(wrapper, attribute) to ensure the attribute is bound 267 # correctly. 268 setattr(mock, attribute, partial(wrapper, attribute)) 269 270 271def _is_magic(name): 272 return '__%s__' % name[2:-2] == name 273 274 275class _SentinelObject(object): 276 "A unique, named, sentinel object." 277 def __init__(self, name): 278 self.name = name 279 280 def __repr__(self): 281 return 'sentinel.%s' % self.name 282 283 def __reduce__(self): 284 return 'sentinel.%s' % self.name 285 286 287class _Sentinel(object): 288 """Access attributes to return a named object, usable as a sentinel.""" 289 def __init__(self): 290 self._sentinels = {} 291 292 def __getattr__(self, name): 293 if name == '__bases__': 294 # Without this help(unittest.mock) raises an exception 295 raise AttributeError 296 return self._sentinels.setdefault(name, _SentinelObject(name)) 297 298 def __reduce__(self): 299 return 'sentinel' 300 301 302sentinel = _Sentinel() 303 304DEFAULT = sentinel.DEFAULT 305_missing = sentinel.MISSING 306_deleted = sentinel.DELETED 307 308 309_allowed_names = { 310 'return_value', '_mock_return_value', 'side_effect', 311 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 312 '_mock_name', '_mock_new_name' 313} 314 315 316def _delegating_property(name): 317 _allowed_names.add(name) 318 _the_name = '_mock_' + name 319 def _get(self, name=name, _the_name=_the_name): 320 sig = self._mock_delegate 321 if sig is None: 322 return getattr(self, _the_name) 323 return getattr(sig, name) 324 def _set(self, value, name=name, _the_name=_the_name): 325 sig = self._mock_delegate 326 if sig is None: 327 self.__dict__[_the_name] = value 328 else: 329 setattr(sig, name, value) 330 331 return property(_get, _set) 332 333 334 335class _CallList(list): 336 337 def __contains__(self, value): 338 if not isinstance(value, list): 339 return list.__contains__(self, value) 340 len_value = len(value) 341 len_self = len(self) 342 if len_value > len_self: 343 return False 344 345 for i in range(0, len_self - len_value + 1): 346 sub_list = self[i:i+len_value] 347 if sub_list == value: 348 return True 349 return False 350 351 def __repr__(self): 352 return pprint.pformat(list(self)) 353 354 355def _check_and_set_parent(parent, value, name, new_name): 356 value = _extract_mock(value) 357 358 if not _is_instance_mock(value): 359 return False 360 if ((value._mock_name or value._mock_new_name) or 361 (value._mock_parent is not None) or 362 (value._mock_new_parent is not None)): 363 return False 364 365 _parent = parent 366 while _parent is not None: 367 # setting a mock (value) as a child or return value of itself 368 # should not modify the mock 369 if _parent is value: 370 return False 371 _parent = _parent._mock_new_parent 372 373 if new_name: 374 value._mock_new_parent = parent 375 value._mock_new_name = new_name 376 if name: 377 value._mock_parent = parent 378 value._mock_name = name 379 return True 380 381# Internal class to identify if we wrapped an iterator object or not. 382class _MockIter(object): 383 def __init__(self, obj): 384 self.obj = iter(obj) 385 def __next__(self): 386 return next(self.obj) 387 388class Base(object): 389 _mock_return_value = DEFAULT 390 _mock_side_effect = None 391 def __init__(self, /, *args, **kwargs): 392 pass 393 394 395 396class NonCallableMock(Base): 397 """A non-callable version of `Mock`""" 398 399 def __new__(cls, /, *args, **kw): 400 # every instance has its own class 401 # so we can create magic methods on the 402 # class without stomping on other mocks 403 bases = (cls,) 404 if not issubclass(cls, AsyncMock): 405 # Check if spec is an async object or function 406 sig = inspect.signature(NonCallableMock.__init__) 407 bound_args = sig.bind_partial(cls, *args, **kw).arguments 408 spec_arg = [ 409 arg for arg in bound_args.keys() 410 if arg.startswith('spec') 411 ] 412 if spec_arg: 413 # what if spec_set is different than spec? 414 if _is_async_obj(bound_args[spec_arg[0]]): 415 bases = (AsyncMockMixin, cls,) 416 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 417 instance = _safe_super(NonCallableMock, cls).__new__(new) 418 return instance 419 420 421 def __init__( 422 self, spec=None, wraps=None, name=None, spec_set=None, 423 parent=None, _spec_state=None, _new_name='', _new_parent=None, 424 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 425 ): 426 if _new_parent is None: 427 _new_parent = parent 428 429 __dict__ = self.__dict__ 430 __dict__['_mock_parent'] = parent 431 __dict__['_mock_name'] = name 432 __dict__['_mock_new_name'] = _new_name 433 __dict__['_mock_new_parent'] = _new_parent 434 __dict__['_mock_sealed'] = False 435 436 if spec_set is not None: 437 spec = spec_set 438 spec_set = True 439 if _eat_self is None: 440 _eat_self = parent is not None 441 442 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 443 444 __dict__['_mock_children'] = {} 445 __dict__['_mock_wraps'] = wraps 446 __dict__['_mock_delegate'] = None 447 448 __dict__['_mock_called'] = False 449 __dict__['_mock_call_args'] = None 450 __dict__['_mock_call_count'] = 0 451 __dict__['_mock_call_args_list'] = _CallList() 452 __dict__['_mock_mock_calls'] = _CallList() 453 454 __dict__['method_calls'] = _CallList() 455 __dict__['_mock_unsafe'] = unsafe 456 457 if kwargs: 458 self.configure_mock(**kwargs) 459 460 _safe_super(NonCallableMock, self).__init__( 461 spec, wraps, name, spec_set, parent, 462 _spec_state 463 ) 464 465 466 def attach_mock(self, mock, attribute): 467 """ 468 Attach a mock as an attribute of this one, replacing its name and 469 parent. Calls to the attached mock will be recorded in the 470 `method_calls` and `mock_calls` attributes of this one.""" 471 inner_mock = _extract_mock(mock) 472 473 inner_mock._mock_parent = None 474 inner_mock._mock_new_parent = None 475 inner_mock._mock_name = '' 476 inner_mock._mock_new_name = None 477 478 setattr(self, attribute, mock) 479 480 481 def mock_add_spec(self, spec, spec_set=False): 482 """Add a spec to a mock. `spec` can either be an object or a 483 list of strings. Only attributes on the `spec` can be fetched as 484 attributes from the mock. 485 486 If `spec_set` is True then only attributes on the spec can be set.""" 487 self._mock_add_spec(spec, spec_set) 488 489 490 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 491 _eat_self=False): 492 _spec_class = None 493 _spec_signature = None 494 _spec_asyncs = [] 495 496 for attr in dir(spec): 497 if asyncio.iscoroutinefunction(getattr(spec, attr, None)): 498 _spec_asyncs.append(attr) 499 500 if spec is not None and not _is_list(spec): 501 if isinstance(spec, type): 502 _spec_class = spec 503 else: 504 _spec_class = type(spec) 505 res = _get_signature_object(spec, 506 _spec_as_instance, _eat_self) 507 _spec_signature = res and res[1] 508 509 spec = dir(spec) 510 511 __dict__ = self.__dict__ 512 __dict__['_spec_class'] = _spec_class 513 __dict__['_spec_set'] = spec_set 514 __dict__['_spec_signature'] = _spec_signature 515 __dict__['_mock_methods'] = spec 516 __dict__['_spec_asyncs'] = _spec_asyncs 517 518 def __get_return_value(self): 519 ret = self._mock_return_value 520 if self._mock_delegate is not None: 521 ret = self._mock_delegate.return_value 522 523 if ret is DEFAULT: 524 ret = self._get_child_mock( 525 _new_parent=self, _new_name='()' 526 ) 527 self.return_value = ret 528 return ret 529 530 531 def __set_return_value(self, value): 532 if self._mock_delegate is not None: 533 self._mock_delegate.return_value = value 534 else: 535 self._mock_return_value = value 536 _check_and_set_parent(self, value, None, '()') 537 538 __return_value_doc = "The value to be returned when the mock is called." 539 return_value = property(__get_return_value, __set_return_value, 540 __return_value_doc) 541 542 543 @property 544 def __class__(self): 545 if self._spec_class is None: 546 return type(self) 547 return self._spec_class 548 549 called = _delegating_property('called') 550 call_count = _delegating_property('call_count') 551 call_args = _delegating_property('call_args') 552 call_args_list = _delegating_property('call_args_list') 553 mock_calls = _delegating_property('mock_calls') 554 555 556 def __get_side_effect(self): 557 delegated = self._mock_delegate 558 if delegated is None: 559 return self._mock_side_effect 560 sf = delegated.side_effect 561 if (sf is not None and not callable(sf) 562 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 563 sf = _MockIter(sf) 564 delegated.side_effect = sf 565 return sf 566 567 def __set_side_effect(self, value): 568 value = _try_iter(value) 569 delegated = self._mock_delegate 570 if delegated is None: 571 self._mock_side_effect = value 572 else: 573 delegated.side_effect = value 574 575 side_effect = property(__get_side_effect, __set_side_effect) 576 577 578 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 579 "Restore the mock object to its initial state." 580 if visited is None: 581 visited = [] 582 if id(self) in visited: 583 return 584 visited.append(id(self)) 585 586 self.called = False 587 self.call_args = None 588 self.call_count = 0 589 self.mock_calls = _CallList() 590 self.call_args_list = _CallList() 591 self.method_calls = _CallList() 592 593 if return_value: 594 self._mock_return_value = DEFAULT 595 if side_effect: 596 self._mock_side_effect = None 597 598 for child in self._mock_children.values(): 599 if isinstance(child, _SpecState) or child is _deleted: 600 continue 601 child.reset_mock(visited) 602 603 ret = self._mock_return_value 604 if _is_instance_mock(ret) and ret is not self: 605 ret.reset_mock(visited) 606 607 608 def configure_mock(self, /, **kwargs): 609 """Set attributes on the mock through keyword arguments. 610 611 Attributes plus return values and side effects can be set on child 612 mocks using standard dot notation and unpacking a dictionary in the 613 method call: 614 615 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 616 >>> mock.configure_mock(**attrs)""" 617 for arg, val in sorted(kwargs.items(), 618 # we sort on the number of dots so that 619 # attributes are set before we set attributes on 620 # attributes 621 key=lambda entry: entry[0].count('.')): 622 args = arg.split('.') 623 final = args.pop() 624 obj = self 625 for entry in args: 626 obj = getattr(obj, entry) 627 setattr(obj, final, val) 628 629 630 def __getattr__(self, name): 631 if name in {'_mock_methods', '_mock_unsafe'}: 632 raise AttributeError(name) 633 elif self._mock_methods is not None: 634 if name not in self._mock_methods or name in _all_magics: 635 raise AttributeError("Mock object has no attribute %r" % name) 636 elif _is_magic(name): 637 raise AttributeError(name) 638 if not self._mock_unsafe: 639 if name.startswith(('assert', 'assret')): 640 raise AttributeError("Attributes cannot start with 'assert' " 641 "or 'assret'") 642 643 result = self._mock_children.get(name) 644 if result is _deleted: 645 raise AttributeError(name) 646 elif result is None: 647 wraps = None 648 if self._mock_wraps is not None: 649 # XXXX should we get the attribute without triggering code 650 # execution? 651 wraps = getattr(self._mock_wraps, name) 652 653 result = self._get_child_mock( 654 parent=self, name=name, wraps=wraps, _new_name=name, 655 _new_parent=self 656 ) 657 self._mock_children[name] = result 658 659 elif isinstance(result, _SpecState): 660 result = create_autospec( 661 result.spec, result.spec_set, result.instance, 662 result.parent, result.name 663 ) 664 self._mock_children[name] = result 665 666 return result 667 668 669 def _extract_mock_name(self): 670 _name_list = [self._mock_new_name] 671 _parent = self._mock_new_parent 672 last = self 673 674 dot = '.' 675 if _name_list == ['()']: 676 dot = '' 677 678 while _parent is not None: 679 last = _parent 680 681 _name_list.append(_parent._mock_new_name + dot) 682 dot = '.' 683 if _parent._mock_new_name == '()': 684 dot = '' 685 686 _parent = _parent._mock_new_parent 687 688 _name_list = list(reversed(_name_list)) 689 _first = last._mock_name or 'mock' 690 if len(_name_list) > 1: 691 if _name_list[1] not in ('()', '().'): 692 _first += '.' 693 _name_list[0] = _first 694 return ''.join(_name_list) 695 696 def __repr__(self): 697 name = self._extract_mock_name() 698 699 name_string = '' 700 if name not in ('mock', 'mock.'): 701 name_string = ' name=%r' % name 702 703 spec_string = '' 704 if self._spec_class is not None: 705 spec_string = ' spec=%r' 706 if self._spec_set: 707 spec_string = ' spec_set=%r' 708 spec_string = spec_string % self._spec_class.__name__ 709 return "<%s%s%s id='%s'>" % ( 710 type(self).__name__, 711 name_string, 712 spec_string, 713 id(self) 714 ) 715 716 717 def __dir__(self): 718 """Filter the output of `dir(mock)` to only useful members.""" 719 if not FILTER_DIR: 720 return object.__dir__(self) 721 722 extras = self._mock_methods or [] 723 from_type = dir(type(self)) 724 from_dict = list(self.__dict__) 725 from_child_mocks = [ 726 m_name for m_name, m_value in self._mock_children.items() 727 if m_value is not _deleted] 728 729 from_type = [e for e in from_type if not e.startswith('_')] 730 from_dict = [e for e in from_dict if not e.startswith('_') or 731 _is_magic(e)] 732 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 733 734 735 def __setattr__(self, name, value): 736 if name in _allowed_names: 737 # property setters go through here 738 return object.__setattr__(self, name, value) 739 elif (self._spec_set and self._mock_methods is not None and 740 name not in self._mock_methods and 741 name not in self.__dict__): 742 raise AttributeError("Mock object has no attribute '%s'" % name) 743 elif name in _unsupported_magics: 744 msg = 'Attempting to set unsupported magic method %r.' % name 745 raise AttributeError(msg) 746 elif name in _all_magics: 747 if self._mock_methods is not None and name not in self._mock_methods: 748 raise AttributeError("Mock object has no attribute '%s'" % name) 749 750 if not _is_instance_mock(value): 751 setattr(type(self), name, _get_method(name, value)) 752 original = value 753 value = lambda *args, **kw: original(self, *args, **kw) 754 else: 755 # only set _new_name and not name so that mock_calls is tracked 756 # but not method calls 757 _check_and_set_parent(self, value, None, name) 758 setattr(type(self), name, value) 759 self._mock_children[name] = value 760 elif name == '__class__': 761 self._spec_class = value 762 return 763 else: 764 if _check_and_set_parent(self, value, name, name): 765 self._mock_children[name] = value 766 767 if self._mock_sealed and not hasattr(self, name): 768 mock_name = f'{self._extract_mock_name()}.{name}' 769 raise AttributeError(f'Cannot set {mock_name}') 770 771 return object.__setattr__(self, name, value) 772 773 774 def __delattr__(self, name): 775 if name in _all_magics and name in type(self).__dict__: 776 delattr(type(self), name) 777 if name not in self.__dict__: 778 # for magic methods that are still MagicProxy objects and 779 # not set on the instance itself 780 return 781 782 obj = self._mock_children.get(name, _missing) 783 if name in self.__dict__: 784 _safe_super(NonCallableMock, self).__delattr__(name) 785 elif obj is _deleted: 786 raise AttributeError(name) 787 if obj is not _missing: 788 del self._mock_children[name] 789 self._mock_children[name] = _deleted 790 791 792 def _format_mock_call_signature(self, args, kwargs): 793 name = self._mock_name or 'mock' 794 return _format_call_signature(name, args, kwargs) 795 796 797 def _format_mock_failure_message(self, args, kwargs, action='call'): 798 message = 'expected %s not found.\nExpected: %s\nActual: %s' 799 expected_string = self._format_mock_call_signature(args, kwargs) 800 call_args = self.call_args 801 actual_string = self._format_mock_call_signature(*call_args) 802 return message % (action, expected_string, actual_string) 803 804 805 def _get_call_signature_from_name(self, name): 806 """ 807 * If call objects are asserted against a method/function like obj.meth1 808 then there could be no name for the call object to lookup. Hence just 809 return the spec_signature of the method/function being asserted against. 810 * If the name is not empty then remove () and split by '.' to get 811 list of names to iterate through the children until a potential 812 match is found. A child mock is created only during attribute access 813 so if we get a _SpecState then no attributes of the spec were accessed 814 and can be safely exited. 815 """ 816 if not name: 817 return self._spec_signature 818 819 sig = None 820 names = name.replace('()', '').split('.') 821 children = self._mock_children 822 823 for name in names: 824 child = children.get(name) 825 if child is None or isinstance(child, _SpecState): 826 break 827 else: 828 children = child._mock_children 829 sig = child._spec_signature 830 831 return sig 832 833 834 def _call_matcher(self, _call): 835 """ 836 Given a call (or simply an (args, kwargs) tuple), return a 837 comparison key suitable for matching with other calls. 838 This is a best effort method which relies on the spec's signature, 839 if available, or falls back on the arguments themselves. 840 """ 841 842 if isinstance(_call, tuple) and len(_call) > 2: 843 sig = self._get_call_signature_from_name(_call[0]) 844 else: 845 sig = self._spec_signature 846 847 if sig is not None: 848 if len(_call) == 2: 849 name = '' 850 args, kwargs = _call 851 else: 852 name, args, kwargs = _call 853 try: 854 return name, sig.bind(*args, **kwargs) 855 except TypeError as e: 856 return e.with_traceback(None) 857 else: 858 return _call 859 860 def assert_not_called(self): 861 """assert that the mock was never called. 862 """ 863 if self.call_count != 0: 864 msg = ("Expected '%s' to not have been called. Called %s times.%s" 865 % (self._mock_name or 'mock', 866 self.call_count, 867 self._calls_repr())) 868 raise AssertionError(msg) 869 870 def assert_called(self): 871 """assert that the mock was called at least once 872 """ 873 if self.call_count == 0: 874 msg = ("Expected '%s' to have been called." % 875 (self._mock_name or 'mock')) 876 raise AssertionError(msg) 877 878 def assert_called_once(self): 879 """assert that the mock was called only once. 880 """ 881 if not self.call_count == 1: 882 msg = ("Expected '%s' to have been called once. Called %s times.%s" 883 % (self._mock_name or 'mock', 884 self.call_count, 885 self._calls_repr())) 886 raise AssertionError(msg) 887 888 def assert_called_with(self, /, *args, **kwargs): 889 """assert that the last call was made with the specified arguments. 890 891 Raises an AssertionError if the args and keyword args passed in are 892 different to the last call to the mock.""" 893 if self.call_args is None: 894 expected = self._format_mock_call_signature(args, kwargs) 895 actual = 'not called.' 896 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 897 % (expected, actual)) 898 raise AssertionError(error_message) 899 900 def _error_message(): 901 msg = self._format_mock_failure_message(args, kwargs) 902 return msg 903 expected = self._call_matcher((args, kwargs)) 904 actual = self._call_matcher(self.call_args) 905 if expected != actual: 906 cause = expected if isinstance(expected, Exception) else None 907 raise AssertionError(_error_message()) from cause 908 909 910 def assert_called_once_with(self, /, *args, **kwargs): 911 """assert that the mock was called exactly once and that that call was 912 with the specified arguments.""" 913 if not self.call_count == 1: 914 msg = ("Expected '%s' to be called once. Called %s times.%s" 915 % (self._mock_name or 'mock', 916 self.call_count, 917 self._calls_repr())) 918 raise AssertionError(msg) 919 return self.assert_called_with(*args, **kwargs) 920 921 922 def assert_has_calls(self, calls, any_order=False): 923 """assert the mock has been called with the specified calls. 924 The `mock_calls` list is checked for the calls. 925 926 If `any_order` is False (the default) then the calls must be 927 sequential. There can be extra calls before or after the 928 specified calls. 929 930 If `any_order` is True then the calls can be in any order, but 931 they must all appear in `mock_calls`.""" 932 expected = [self._call_matcher(c) for c in calls] 933 cause = next((e for e in expected if isinstance(e, Exception)), None) 934 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 935 if not any_order: 936 if expected not in all_calls: 937 if cause is None: 938 problem = 'Calls not found.' 939 else: 940 problem = ('Error processing expected calls.\n' 941 'Errors: {}').format( 942 [e if isinstance(e, Exception) else None 943 for e in expected]) 944 raise AssertionError( 945 f'{problem}\n' 946 f'Expected: {_CallList(calls)}' 947 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 948 ) from cause 949 return 950 951 all_calls = list(all_calls) 952 953 not_found = [] 954 for kall in expected: 955 try: 956 all_calls.remove(kall) 957 except ValueError: 958 not_found.append(kall) 959 if not_found: 960 raise AssertionError( 961 '%r does not contain all of %r in its call list, ' 962 'found %r instead' % (self._mock_name or 'mock', 963 tuple(not_found), all_calls) 964 ) from cause 965 966 967 def assert_any_call(self, /, *args, **kwargs): 968 """assert the mock has been called with the specified arguments. 969 970 The assert passes if the mock has *ever* been called, unlike 971 `assert_called_with` and `assert_called_once_with` that only pass if 972 the call is the most recent one.""" 973 expected = self._call_matcher((args, kwargs)) 974 actual = [self._call_matcher(c) for c in self.call_args_list] 975 if expected not in actual: 976 cause = expected if isinstance(expected, Exception) else None 977 expected_string = self._format_mock_call_signature(args, kwargs) 978 raise AssertionError( 979 '%s call not found' % expected_string 980 ) from cause 981 982 983 def _get_child_mock(self, /, **kw): 984 """Create the child mocks for attributes and return value. 985 By default child mocks will be the same type as the parent. 986 Subclasses of Mock may want to override this to customize the way 987 child mocks are made. 988 989 For non-callable mocks the callable variant will be used (rather than 990 any custom subclass).""" 991 _new_name = kw.get("_new_name") 992 if _new_name in self.__dict__['_spec_asyncs']: 993 return AsyncMock(**kw) 994 995 _type = type(self) 996 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 997 # Any asynchronous magic becomes an AsyncMock 998 klass = AsyncMock 999 elif issubclass(_type, AsyncMockMixin): 1000 if (_new_name in _all_sync_magics or 1001 self._mock_methods and _new_name in self._mock_methods): 1002 # Any synchronous method on AsyncMock becomes a MagicMock 1003 klass = MagicMock 1004 else: 1005 klass = AsyncMock 1006 elif not issubclass(_type, CallableMixin): 1007 if issubclass(_type, NonCallableMagicMock): 1008 klass = MagicMock 1009 elif issubclass(_type, NonCallableMock): 1010 klass = Mock 1011 else: 1012 klass = _type.__mro__[1] 1013 1014 if self._mock_sealed: 1015 attribute = "." + kw["name"] if "name" in kw else "()" 1016 mock_name = self._extract_mock_name() + attribute 1017 raise AttributeError(mock_name) 1018 1019 return klass(**kw) 1020 1021 1022 def _calls_repr(self, prefix="Calls"): 1023 """Renders self.mock_calls as a string. 1024 1025 Example: "\nCalls: [call(1), call(2)]." 1026 1027 If self.mock_calls is empty, an empty string is returned. The 1028 output will be truncated if very long. 1029 """ 1030 if not self.mock_calls: 1031 return "" 1032 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 1033 1034 1035 1036def _try_iter(obj): 1037 if obj is None: 1038 return obj 1039 if _is_exception(obj): 1040 return obj 1041 if _callable(obj): 1042 return obj 1043 try: 1044 return iter(obj) 1045 except TypeError: 1046 # XXXX backwards compatibility 1047 # but this will blow up on first call - so maybe we should fail early? 1048 return obj 1049 1050 1051class CallableMixin(Base): 1052 1053 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 1054 wraps=None, name=None, spec_set=None, parent=None, 1055 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 1056 self.__dict__['_mock_return_value'] = return_value 1057 _safe_super(CallableMixin, self).__init__( 1058 spec, wraps, name, spec_set, parent, 1059 _spec_state, _new_name, _new_parent, **kwargs 1060 ) 1061 1062 self.side_effect = side_effect 1063 1064 1065 def _mock_check_sig(self, /, *args, **kwargs): 1066 # stub method that can be replaced with one with a specific signature 1067 pass 1068 1069 1070 def __call__(self, /, *args, **kwargs): 1071 # can't use self in-case a function / method we are mocking uses self 1072 # in the signature 1073 self._mock_check_sig(*args, **kwargs) 1074 self._increment_mock_call(*args, **kwargs) 1075 return self._mock_call(*args, **kwargs) 1076 1077 1078 def _mock_call(self, /, *args, **kwargs): 1079 return self._execute_mock_call(*args, **kwargs) 1080 1081 def _increment_mock_call(self, /, *args, **kwargs): 1082 self.called = True 1083 self.call_count += 1 1084 1085 # handle call_args 1086 # needs to be set here so assertions on call arguments pass before 1087 # execution in the case of awaited calls 1088 _call = _Call((args, kwargs), two=True) 1089 self.call_args = _call 1090 self.call_args_list.append(_call) 1091 1092 # initial stuff for method_calls: 1093 do_method_calls = self._mock_parent is not None 1094 method_call_name = self._mock_name 1095 1096 # initial stuff for mock_calls: 1097 mock_call_name = self._mock_new_name 1098 is_a_call = mock_call_name == '()' 1099 self.mock_calls.append(_Call(('', args, kwargs))) 1100 1101 # follow up the chain of mocks: 1102 _new_parent = self._mock_new_parent 1103 while _new_parent is not None: 1104 1105 # handle method_calls: 1106 if do_method_calls: 1107 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 1108 do_method_calls = _new_parent._mock_parent is not None 1109 if do_method_calls: 1110 method_call_name = _new_parent._mock_name + '.' + method_call_name 1111 1112 # handle mock_calls: 1113 this_mock_call = _Call((mock_call_name, args, kwargs)) 1114 _new_parent.mock_calls.append(this_mock_call) 1115 1116 if _new_parent._mock_new_name: 1117 if is_a_call: 1118 dot = '' 1119 else: 1120 dot = '.' 1121 is_a_call = _new_parent._mock_new_name == '()' 1122 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 1123 1124 # follow the parental chain: 1125 _new_parent = _new_parent._mock_new_parent 1126 1127 def _execute_mock_call(self, /, *args, **kwargs): 1128 # separate from _increment_mock_call so that awaited functions are 1129 # executed separately from their call, also AsyncMock overrides this method 1130 1131 effect = self.side_effect 1132 if effect is not None: 1133 if _is_exception(effect): 1134 raise effect 1135 elif not _callable(effect): 1136 result = next(effect) 1137 if _is_exception(result): 1138 raise result 1139 else: 1140 result = effect(*args, **kwargs) 1141 1142 if result is not DEFAULT: 1143 return result 1144 1145 if self._mock_return_value is not DEFAULT: 1146 return self.return_value 1147 1148 if self._mock_wraps is not None: 1149 return self._mock_wraps(*args, **kwargs) 1150 1151 return self.return_value 1152 1153 1154 1155class Mock(CallableMixin, NonCallableMock): 1156 """ 1157 Create a new `Mock` object. `Mock` takes several optional arguments 1158 that specify the behaviour of the Mock object: 1159 1160 * `spec`: This can be either a list of strings or an existing object (a 1161 class or instance) that acts as the specification for the mock object. If 1162 you pass in an object then a list of strings is formed by calling dir on 1163 the object (excluding unsupported magic attributes and methods). Accessing 1164 any attribute not in this list will raise an `AttributeError`. 1165 1166 If `spec` is an object (rather than a list of strings) then 1167 `mock.__class__` returns the class of the spec object. This allows mocks 1168 to pass `isinstance` tests. 1169 1170 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 1171 or get an attribute on the mock that isn't on the object passed as 1172 `spec_set` will raise an `AttributeError`. 1173 1174 * `side_effect`: A function to be called whenever the Mock is called. See 1175 the `side_effect` attribute. Useful for raising exceptions or 1176 dynamically changing return values. The function is called with the same 1177 arguments as the mock, and unless it returns `DEFAULT`, the return 1178 value of this function is used as the return value. 1179 1180 If `side_effect` is an iterable then each call to the mock will return 1181 the next value from the iterable. If any of the members of the iterable 1182 are exceptions they will be raised instead of returned. 1183 1184 * `return_value`: The value returned when the mock is called. By default 1185 this is a new Mock (created on first access). See the 1186 `return_value` attribute. 1187 1188 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 1189 calling the Mock will pass the call through to the wrapped object 1190 (returning the real result). Attribute access on the mock will return a 1191 Mock object that wraps the corresponding attribute of the wrapped object 1192 (so attempting to access an attribute that doesn't exist will raise an 1193 `AttributeError`). 1194 1195 If the mock has an explicit `return_value` set then calls are not passed 1196 to the wrapped object and the `return_value` is returned instead. 1197 1198 * `name`: If the mock has a name then it will be used in the repr of the 1199 mock. This can be useful for debugging. The name is propagated to child 1200 mocks. 1201 1202 Mocks can also be called with arbitrary keyword arguments. These will be 1203 used to set attributes on the mock after it is created. 1204 """ 1205 1206 1207def _dot_lookup(thing, comp, import_path): 1208 try: 1209 return getattr(thing, comp) 1210 except AttributeError: 1211 __import__(import_path) 1212 return getattr(thing, comp) 1213 1214 1215def _importer(target): 1216 components = target.split('.') 1217 import_path = components.pop(0) 1218 thing = __import__(import_path) 1219 1220 for comp in components: 1221 import_path += ".%s" % comp 1222 thing = _dot_lookup(thing, comp, import_path) 1223 return thing 1224 1225 1226def _is_started(patcher): 1227 # XXXX horrible 1228 return hasattr(patcher, 'is_local') 1229 1230 1231class _patch(object): 1232 1233 attribute_name = None 1234 _active_patches = [] 1235 1236 def __init__( 1237 self, getter, attribute, new, spec, create, 1238 spec_set, autospec, new_callable, kwargs 1239 ): 1240 if new_callable is not None: 1241 if new is not DEFAULT: 1242 raise ValueError( 1243 "Cannot use 'new' and 'new_callable' together" 1244 ) 1245 if autospec is not None: 1246 raise ValueError( 1247 "Cannot use 'autospec' and 'new_callable' together" 1248 ) 1249 1250 self.getter = getter 1251 self.attribute = attribute 1252 self.new = new 1253 self.new_callable = new_callable 1254 self.spec = spec 1255 self.create = create 1256 self.has_local = False 1257 self.spec_set = spec_set 1258 self.autospec = autospec 1259 self.kwargs = kwargs 1260 self.additional_patchers = [] 1261 1262 1263 def copy(self): 1264 patcher = _patch( 1265 self.getter, self.attribute, self.new, self.spec, 1266 self.create, self.spec_set, 1267 self.autospec, self.new_callable, self.kwargs 1268 ) 1269 patcher.attribute_name = self.attribute_name 1270 patcher.additional_patchers = [ 1271 p.copy() for p in self.additional_patchers 1272 ] 1273 return patcher 1274 1275 1276 def __call__(self, func): 1277 if isinstance(func, type): 1278 return self.decorate_class(func) 1279 if inspect.iscoroutinefunction(func): 1280 return self.decorate_async_callable(func) 1281 return self.decorate_callable(func) 1282 1283 1284 def decorate_class(self, klass): 1285 for attr in dir(klass): 1286 if not attr.startswith(patch.TEST_PREFIX): 1287 continue 1288 1289 attr_value = getattr(klass, attr) 1290 if not hasattr(attr_value, "__call__"): 1291 continue 1292 1293 patcher = self.copy() 1294 setattr(klass, attr, patcher(attr_value)) 1295 return klass 1296 1297 1298 @contextlib.contextmanager 1299 def decoration_helper(self, patched, args, keywargs): 1300 extra_args = [] 1301 entered_patchers = [] 1302 patching = None 1303 1304 exc_info = tuple() 1305 try: 1306 for patching in patched.patchings: 1307 arg = patching.__enter__() 1308 entered_patchers.append(patching) 1309 if patching.attribute_name is not None: 1310 keywargs.update(arg) 1311 elif patching.new is DEFAULT: 1312 extra_args.append(arg) 1313 1314 args += tuple(extra_args) 1315 yield (args, keywargs) 1316 except: 1317 if (patching not in entered_patchers and 1318 _is_started(patching)): 1319 # the patcher may have been started, but an exception 1320 # raised whilst entering one of its additional_patchers 1321 entered_patchers.append(patching) 1322 # Pass the exception to __exit__ 1323 exc_info = sys.exc_info() 1324 # re-raise the exception 1325 raise 1326 finally: 1327 for patching in reversed(entered_patchers): 1328 patching.__exit__(*exc_info) 1329 1330 1331 def decorate_callable(self, func): 1332 # NB. Keep the method in sync with decorate_async_callable() 1333 if hasattr(func, 'patchings'): 1334 func.patchings.append(self) 1335 return func 1336 1337 @wraps(func) 1338 def patched(*args, **keywargs): 1339 with self.decoration_helper(patched, 1340 args, 1341 keywargs) as (newargs, newkeywargs): 1342 return func(*newargs, **newkeywargs) 1343 1344 patched.patchings = [self] 1345 return patched 1346 1347 1348 def decorate_async_callable(self, func): 1349 # NB. Keep the method in sync with decorate_callable() 1350 if hasattr(func, 'patchings'): 1351 func.patchings.append(self) 1352 return func 1353 1354 @wraps(func) 1355 async def patched(*args, **keywargs): 1356 with self.decoration_helper(patched, 1357 args, 1358 keywargs) as (newargs, newkeywargs): 1359 return await func(*newargs, **newkeywargs) 1360 1361 patched.patchings = [self] 1362 return patched 1363 1364 1365 def get_original(self): 1366 target = self.getter() 1367 name = self.attribute 1368 1369 original = DEFAULT 1370 local = False 1371 1372 try: 1373 original = target.__dict__[name] 1374 except (AttributeError, KeyError): 1375 original = getattr(target, name, DEFAULT) 1376 else: 1377 local = True 1378 1379 if name in _builtins and isinstance(target, ModuleType): 1380 self.create = True 1381 1382 if not self.create and original is DEFAULT: 1383 raise AttributeError( 1384 "%s does not have the attribute %r" % (target, name) 1385 ) 1386 return original, local 1387 1388 1389 def __enter__(self): 1390 """Perform the patch.""" 1391 new, spec, spec_set = self.new, self.spec, self.spec_set 1392 autospec, kwargs = self.autospec, self.kwargs 1393 new_callable = self.new_callable 1394 self.target = self.getter() 1395 1396 # normalise False to None 1397 if spec is False: 1398 spec = None 1399 if spec_set is False: 1400 spec_set = None 1401 if autospec is False: 1402 autospec = None 1403 1404 if spec is not None and autospec is not None: 1405 raise TypeError("Can't specify spec and autospec") 1406 if ((spec is not None or autospec is not None) and 1407 spec_set not in (True, None)): 1408 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 1409 1410 original, local = self.get_original() 1411 1412 if new is DEFAULT and autospec is None: 1413 inherit = False 1414 if spec is True: 1415 # set spec to the object we are replacing 1416 spec = original 1417 if spec_set is True: 1418 spec_set = original 1419 spec = None 1420 elif spec is not None: 1421 if spec_set is True: 1422 spec_set = spec 1423 spec = None 1424 elif spec_set is True: 1425 spec_set = original 1426 1427 if spec is not None or spec_set is not None: 1428 if original is DEFAULT: 1429 raise TypeError("Can't use 'spec' with create=True") 1430 if isinstance(original, type): 1431 # If we're patching out a class and there is a spec 1432 inherit = True 1433 if spec is None and _is_async_obj(original): 1434 Klass = AsyncMock 1435 else: 1436 Klass = MagicMock 1437 _kwargs = {} 1438 if new_callable is not None: 1439 Klass = new_callable 1440 elif spec is not None or spec_set is not None: 1441 this_spec = spec 1442 if spec_set is not None: 1443 this_spec = spec_set 1444 if _is_list(this_spec): 1445 not_callable = '__call__' not in this_spec 1446 else: 1447 not_callable = not callable(this_spec) 1448 if _is_async_obj(this_spec): 1449 Klass = AsyncMock 1450 elif not_callable: 1451 Klass = NonCallableMagicMock 1452 1453 if spec is not None: 1454 _kwargs['spec'] = spec 1455 if spec_set is not None: 1456 _kwargs['spec_set'] = spec_set 1457 1458 # add a name to mocks 1459 if (isinstance(Klass, type) and 1460 issubclass(Klass, NonCallableMock) and self.attribute): 1461 _kwargs['name'] = self.attribute 1462 1463 _kwargs.update(kwargs) 1464 new = Klass(**_kwargs) 1465 1466 if inherit and _is_instance_mock(new): 1467 # we can only tell if the instance should be callable if the 1468 # spec is not a list 1469 this_spec = spec 1470 if spec_set is not None: 1471 this_spec = spec_set 1472 if (not _is_list(this_spec) and not 1473 _instance_callable(this_spec)): 1474 Klass = NonCallableMagicMock 1475 1476 _kwargs.pop('name') 1477 new.return_value = Klass(_new_parent=new, _new_name='()', 1478 **_kwargs) 1479 elif autospec is not None: 1480 # spec is ignored, new *must* be default, spec_set is treated 1481 # as a boolean. Should we check spec is not None and that spec_set 1482 # is a bool? 1483 if new is not DEFAULT: 1484 raise TypeError( 1485 "autospec creates the mock for you. Can't specify " 1486 "autospec and new." 1487 ) 1488 if original is DEFAULT: 1489 raise TypeError("Can't use 'autospec' with create=True") 1490 spec_set = bool(spec_set) 1491 if autospec is True: 1492 autospec = original 1493 1494 new = create_autospec(autospec, spec_set=spec_set, 1495 _name=self.attribute, **kwargs) 1496 elif kwargs: 1497 # can't set keyword args when we aren't creating the mock 1498 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 1499 raise TypeError("Can't pass kwargs to a mock we aren't creating") 1500 1501 new_attr = new 1502 1503 self.temp_original = original 1504 self.is_local = local 1505 setattr(self.target, self.attribute, new_attr) 1506 if self.attribute_name is not None: 1507 extra_args = {} 1508 if self.new is DEFAULT: 1509 extra_args[self.attribute_name] = new 1510 for patching in self.additional_patchers: 1511 arg = patching.__enter__() 1512 if patching.new is DEFAULT: 1513 extra_args.update(arg) 1514 return extra_args 1515 1516 return new 1517 1518 1519 def __exit__(self, *exc_info): 1520 """Undo the patch.""" 1521 if not _is_started(self): 1522 return 1523 1524 if self.is_local and self.temp_original is not DEFAULT: 1525 setattr(self.target, self.attribute, self.temp_original) 1526 else: 1527 delattr(self.target, self.attribute) 1528 if not self.create and (not hasattr(self.target, self.attribute) or 1529 self.attribute in ('__doc__', '__module__', 1530 '__defaults__', '__annotations__', 1531 '__kwdefaults__')): 1532 # needed for proxy objects like django settings 1533 setattr(self.target, self.attribute, self.temp_original) 1534 1535 del self.temp_original 1536 del self.is_local 1537 del self.target 1538 for patcher in reversed(self.additional_patchers): 1539 if _is_started(patcher): 1540 patcher.__exit__(*exc_info) 1541 1542 1543 def start(self): 1544 """Activate a patch, returning any created mock.""" 1545 result = self.__enter__() 1546 self._active_patches.append(self) 1547 return result 1548 1549 1550 def stop(self): 1551 """Stop an active patch.""" 1552 try: 1553 self._active_patches.remove(self) 1554 except ValueError: 1555 # If the patch hasn't been started this will fail 1556 pass 1557 1558 return self.__exit__() 1559 1560 1561 1562def _get_target(target): 1563 try: 1564 target, attribute = target.rsplit('.', 1) 1565 except (TypeError, ValueError): 1566 raise TypeError("Need a valid target to patch. You supplied: %r" % 1567 (target,)) 1568 getter = lambda: _importer(target) 1569 return getter, attribute 1570 1571 1572def _patch_object( 1573 target, attribute, new=DEFAULT, spec=None, 1574 create=False, spec_set=None, autospec=None, 1575 new_callable=None, **kwargs 1576 ): 1577 """ 1578 patch the named member (`attribute`) on an object (`target`) with a mock 1579 object. 1580 1581 `patch.object` can be used as a decorator, class decorator or a context 1582 manager. Arguments `new`, `spec`, `create`, `spec_set`, 1583 `autospec` and `new_callable` have the same meaning as for `patch`. Like 1584 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 1585 the mock object it creates. 1586 1587 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 1588 for choosing which methods to wrap. 1589 """ 1590 if type(target) is str: 1591 raise TypeError( 1592 f"{target!r} must be the actual object to be patched, not a str" 1593 ) 1594 getter = lambda: target 1595 return _patch( 1596 getter, attribute, new, spec, create, 1597 spec_set, autospec, new_callable, kwargs 1598 ) 1599 1600 1601def _patch_multiple(target, spec=None, create=False, spec_set=None, 1602 autospec=None, new_callable=None, **kwargs): 1603 """Perform multiple patches in a single call. It takes the object to be 1604 patched (either as an object or a string to fetch the object by importing) 1605 and keyword arguments for the patches:: 1606 1607 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 1608 ... 1609 1610 Use `DEFAULT` as the value if you want `patch.multiple` to create 1611 mocks for you. In this case the created mocks are passed into a decorated 1612 function by keyword, and a dictionary is returned when `patch.multiple` is 1613 used as a context manager. 1614 1615 `patch.multiple` can be used as a decorator, class decorator or a context 1616 manager. The arguments `spec`, `spec_set`, `create`, 1617 `autospec` and `new_callable` have the same meaning as for `patch`. These 1618 arguments will be applied to *all* patches done by `patch.multiple`. 1619 1620 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 1621 for choosing which methods to wrap. 1622 """ 1623 if type(target) is str: 1624 getter = lambda: _importer(target) 1625 else: 1626 getter = lambda: target 1627 1628 if not kwargs: 1629 raise ValueError( 1630 'Must supply at least one keyword argument with patch.multiple' 1631 ) 1632 # need to wrap in a list for python 3, where items is a view 1633 items = list(kwargs.items()) 1634 attribute, new = items[0] 1635 patcher = _patch( 1636 getter, attribute, new, spec, create, spec_set, 1637 autospec, new_callable, {} 1638 ) 1639 patcher.attribute_name = attribute 1640 for attribute, new in items[1:]: 1641 this_patcher = _patch( 1642 getter, attribute, new, spec, create, spec_set, 1643 autospec, new_callable, {} 1644 ) 1645 this_patcher.attribute_name = attribute 1646 patcher.additional_patchers.append(this_patcher) 1647 return patcher 1648 1649 1650def patch( 1651 target, new=DEFAULT, spec=None, create=False, 1652 spec_set=None, autospec=None, new_callable=None, **kwargs 1653 ): 1654 """ 1655 `patch` acts as a function decorator, class decorator or a context 1656 manager. Inside the body of the function or with statement, the `target` 1657 is patched with a `new` object. When the function/with statement exits 1658 the patch is undone. 1659 1660 If `new` is omitted, then the target is replaced with an 1661 `AsyncMock if the patched object is an async function or a 1662 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 1663 omitted, the created mock is passed in as an extra argument to the 1664 decorated function. If `patch` is used as a context manager the created 1665 mock is returned by the context manager. 1666 1667 `target` should be a string in the form `'package.module.ClassName'`. The 1668 `target` is imported and the specified object replaced with the `new` 1669 object, so the `target` must be importable from the environment you are 1670 calling `patch` from. The target is imported when the decorated function 1671 is executed, not at decoration time. 1672 1673 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 1674 if patch is creating one for you. 1675 1676 In addition you can pass `spec=True` or `spec_set=True`, which causes 1677 patch to pass in the object being mocked as the spec/spec_set object. 1678 1679 `new_callable` allows you to specify a different class, or callable object, 1680 that will be called to create the `new` object. By default `AsyncMock` is 1681 used for async functions and `MagicMock` for the rest. 1682 1683 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 1684 then the mock will be created with a spec from the object being replaced. 1685 All attributes of the mock will also have the spec of the corresponding 1686 attribute of the object being replaced. Methods and functions being 1687 mocked will have their arguments checked and will raise a `TypeError` if 1688 they are called with the wrong signature. For mocks replacing a class, 1689 their return value (the 'instance') will have the same spec as the class. 1690 1691 Instead of `autospec=True` you can pass `autospec=some_object` to use an 1692 arbitrary object as the spec instead of the one being replaced. 1693 1694 By default `patch` will fail to replace attributes that don't exist. If 1695 you pass in `create=True`, and the attribute doesn't exist, patch will 1696 create the attribute for you when the patched function is called, and 1697 delete it again afterwards. This is useful for writing tests against 1698 attributes that your production code creates at runtime. It is off by 1699 default because it can be dangerous. With it switched on you can write 1700 passing tests against APIs that don't actually exist! 1701 1702 Patch can be used as a `TestCase` class decorator. It works by 1703 decorating each test method in the class. This reduces the boilerplate 1704 code when your test methods share a common patchings set. `patch` finds 1705 tests by looking for method names that start with `patch.TEST_PREFIX`. 1706 By default this is `test`, which matches the way `unittest` finds tests. 1707 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 1708 1709 Patch can be used as a context manager, with the with statement. Here the 1710 patching applies to the indented block after the with statement. If you 1711 use "as" then the patched object will be bound to the name after the 1712 "as"; very useful if `patch` is creating a mock object for you. 1713 1714 `patch` takes arbitrary keyword arguments. These will be passed to 1715 the `Mock` (or `new_callable`) on construction. 1716 1717 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 1718 available for alternate use-cases. 1719 """ 1720 getter, attribute = _get_target(target) 1721 return _patch( 1722 getter, attribute, new, spec, create, 1723 spec_set, autospec, new_callable, kwargs 1724 ) 1725 1726 1727class _patch_dict(object): 1728 """ 1729 Patch a dictionary, or dictionary like object, and restore the dictionary 1730 to its original state after the test. 1731 1732 `in_dict` can be a dictionary or a mapping like container. If it is a 1733 mapping then it must at least support getting, setting and deleting items 1734 plus iterating over keys. 1735 1736 `in_dict` can also be a string specifying the name of the dictionary, which 1737 will then be fetched by importing it. 1738 1739 `values` can be a dictionary of values to set in the dictionary. `values` 1740 can also be an iterable of `(key, value)` pairs. 1741 1742 If `clear` is True then the dictionary will be cleared before the new 1743 values are set. 1744 1745 `patch.dict` can also be called with arbitrary keyword arguments to set 1746 values in the dictionary:: 1747 1748 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 1749 ... 1750 1751 `patch.dict` can be used as a context manager, decorator or class 1752 decorator. When used as a class decorator `patch.dict` honours 1753 `patch.TEST_PREFIX` for choosing which methods to wrap. 1754 """ 1755 1756 def __init__(self, in_dict, values=(), clear=False, **kwargs): 1757 self.in_dict = in_dict 1758 # support any argument supported by dict(...) constructor 1759 self.values = dict(values) 1760 self.values.update(kwargs) 1761 self.clear = clear 1762 self._original = None 1763 1764 1765 def __call__(self, f): 1766 if isinstance(f, type): 1767 return self.decorate_class(f) 1768 @wraps(f) 1769 def _inner(*args, **kw): 1770 self._patch_dict() 1771 try: 1772 return f(*args, **kw) 1773 finally: 1774 self._unpatch_dict() 1775 1776 return _inner 1777 1778 1779 def decorate_class(self, klass): 1780 for attr in dir(klass): 1781 attr_value = getattr(klass, attr) 1782 if (attr.startswith(patch.TEST_PREFIX) and 1783 hasattr(attr_value, "__call__")): 1784 decorator = _patch_dict(self.in_dict, self.values, self.clear) 1785 decorated = decorator(attr_value) 1786 setattr(klass, attr, decorated) 1787 return klass 1788 1789 1790 def __enter__(self): 1791 """Patch the dict.""" 1792 self._patch_dict() 1793 return self.in_dict 1794 1795 1796 def _patch_dict(self): 1797 values = self.values 1798 if isinstance(self.in_dict, str): 1799 self.in_dict = _importer(self.in_dict) 1800 in_dict = self.in_dict 1801 clear = self.clear 1802 1803 try: 1804 original = in_dict.copy() 1805 except AttributeError: 1806 # dict like object with no copy method 1807 # must support iteration over keys 1808 original = {} 1809 for key in in_dict: 1810 original[key] = in_dict[key] 1811 self._original = original 1812 1813 if clear: 1814 _clear_dict(in_dict) 1815 1816 try: 1817 in_dict.update(values) 1818 except AttributeError: 1819 # dict like object with no update method 1820 for key in values: 1821 in_dict[key] = values[key] 1822 1823 1824 def _unpatch_dict(self): 1825 in_dict = self.in_dict 1826 original = self._original 1827 1828 _clear_dict(in_dict) 1829 1830 try: 1831 in_dict.update(original) 1832 except AttributeError: 1833 for key in original: 1834 in_dict[key] = original[key] 1835 1836 1837 def __exit__(self, *args): 1838 """Unpatch the dict.""" 1839 self._unpatch_dict() 1840 return False 1841 1842 start = __enter__ 1843 stop = __exit__ 1844 1845 1846def _clear_dict(in_dict): 1847 try: 1848 in_dict.clear() 1849 except AttributeError: 1850 keys = list(in_dict) 1851 for key in keys: 1852 del in_dict[key] 1853 1854 1855def _patch_stopall(): 1856 """Stop all active patches. LIFO to unroll nested patches.""" 1857 for patch in reversed(_patch._active_patches): 1858 patch.stop() 1859 1860 1861patch.object = _patch_object 1862patch.dict = _patch_dict 1863patch.multiple = _patch_multiple 1864patch.stopall = _patch_stopall 1865patch.TEST_PREFIX = 'test' 1866 1867magic_methods = ( 1868 "lt le gt ge eq ne " 1869 "getitem setitem delitem " 1870 "len contains iter " 1871 "hash str sizeof " 1872 "enter exit " 1873 # we added divmod and rdivmod here instead of numerics 1874 # because there is no idivmod 1875 "divmod rdivmod neg pos abs invert " 1876 "complex int float index " 1877 "round trunc floor ceil " 1878 "bool next " 1879 "fspath " 1880 "aiter " 1881) 1882 1883numerics = ( 1884 "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv" 1885) 1886inplace = ' '.join('i%s' % n for n in numerics.split()) 1887right = ' '.join('r%s' % n for n in numerics.split()) 1888 1889# not including __prepare__, __instancecheck__, __subclasscheck__ 1890# (as they are metaclass methods) 1891# __del__ is not supported at all as it causes problems if it exists 1892 1893_non_defaults = { 1894 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 1895 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 1896 '__getstate__', '__setstate__', '__getformat__', '__setformat__', 1897 '__repr__', '__dir__', '__subclasses__', '__format__', 1898 '__getnewargs_ex__', 1899} 1900 1901 1902def _get_method(name, func): 1903 "Turns a callable object (like a mock) into a real function" 1904 def method(self, /, *args, **kw): 1905 return func(self, *args, **kw) 1906 method.__name__ = name 1907 return method 1908 1909 1910_magics = { 1911 '__%s__' % method for method in 1912 ' '.join([magic_methods, numerics, inplace, right]).split() 1913} 1914 1915# Magic methods used for async `with` statements 1916_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 1917# Magic methods that are only used with async calls but are synchronous functions themselves 1918_sync_async_magics = {"__aiter__"} 1919_async_magics = _async_method_magics | _sync_async_magics 1920 1921_all_sync_magics = _magics | _non_defaults 1922_all_magics = _all_sync_magics | _async_magics 1923 1924_unsupported_magics = { 1925 '__getattr__', '__setattr__', 1926 '__init__', '__new__', '__prepare__', 1927 '__instancecheck__', '__subclasscheck__', 1928 '__del__' 1929} 1930 1931_calculate_return_value = { 1932 '__hash__': lambda self: object.__hash__(self), 1933 '__str__': lambda self: object.__str__(self), 1934 '__sizeof__': lambda self: object.__sizeof__(self), 1935 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 1936} 1937 1938_return_values = { 1939 '__lt__': NotImplemented, 1940 '__gt__': NotImplemented, 1941 '__le__': NotImplemented, 1942 '__ge__': NotImplemented, 1943 '__int__': 1, 1944 '__contains__': False, 1945 '__len__': 0, 1946 '__exit__': False, 1947 '__complex__': 1j, 1948 '__float__': 1.0, 1949 '__bool__': True, 1950 '__index__': 1, 1951 '__aexit__': False, 1952} 1953 1954 1955def _get_eq(self): 1956 def __eq__(other): 1957 ret_val = self.__eq__._mock_return_value 1958 if ret_val is not DEFAULT: 1959 return ret_val 1960 if self is other: 1961 return True 1962 return NotImplemented 1963 return __eq__ 1964 1965def _get_ne(self): 1966 def __ne__(other): 1967 if self.__ne__._mock_return_value is not DEFAULT: 1968 return DEFAULT 1969 if self is other: 1970 return False 1971 return NotImplemented 1972 return __ne__ 1973 1974def _get_iter(self): 1975 def __iter__(): 1976 ret_val = self.__iter__._mock_return_value 1977 if ret_val is DEFAULT: 1978 return iter([]) 1979 # if ret_val was already an iterator, then calling iter on it should 1980 # return the iterator unchanged 1981 return iter(ret_val) 1982 return __iter__ 1983 1984def _get_async_iter(self): 1985 def __aiter__(): 1986 ret_val = self.__aiter__._mock_return_value 1987 if ret_val is DEFAULT: 1988 return _AsyncIterator(iter([])) 1989 return _AsyncIterator(iter(ret_val)) 1990 return __aiter__ 1991 1992_side_effect_methods = { 1993 '__eq__': _get_eq, 1994 '__ne__': _get_ne, 1995 '__iter__': _get_iter, 1996 '__aiter__': _get_async_iter 1997} 1998 1999 2000 2001def _set_return_value(mock, method, name): 2002 fixed = _return_values.get(name, DEFAULT) 2003 if fixed is not DEFAULT: 2004 method.return_value = fixed 2005 return 2006 2007 return_calculator = _calculate_return_value.get(name) 2008 if return_calculator is not None: 2009 return_value = return_calculator(mock) 2010 method.return_value = return_value 2011 return 2012 2013 side_effector = _side_effect_methods.get(name) 2014 if side_effector is not None: 2015 method.side_effect = side_effector(mock) 2016 2017 2018 2019class MagicMixin(Base): 2020 def __init__(self, /, *args, **kw): 2021 self._mock_set_magics() # make magic work for kwargs in init 2022 _safe_super(MagicMixin, self).__init__(*args, **kw) 2023 self._mock_set_magics() # fix magic broken by upper level init 2024 2025 2026 def _mock_set_magics(self): 2027 orig_magics = _magics | _async_method_magics 2028 these_magics = orig_magics 2029 2030 if getattr(self, "_mock_methods", None) is not None: 2031 these_magics = orig_magics.intersection(self._mock_methods) 2032 2033 remove_magics = set() 2034 remove_magics = orig_magics - these_magics 2035 2036 for entry in remove_magics: 2037 if entry in type(self).__dict__: 2038 # remove unneeded magic methods 2039 delattr(self, entry) 2040 2041 # don't overwrite existing attributes if called a second time 2042 these_magics = these_magics - set(type(self).__dict__) 2043 2044 _type = type(self) 2045 for entry in these_magics: 2046 setattr(_type, entry, MagicProxy(entry, self)) 2047 2048 2049 2050class NonCallableMagicMock(MagicMixin, NonCallableMock): 2051 """A version of `MagicMock` that isn't callable.""" 2052 def mock_add_spec(self, spec, spec_set=False): 2053 """Add a spec to a mock. `spec` can either be an object or a 2054 list of strings. Only attributes on the `spec` can be fetched as 2055 attributes from the mock. 2056 2057 If `spec_set` is True then only attributes on the spec can be set.""" 2058 self._mock_add_spec(spec, spec_set) 2059 self._mock_set_magics() 2060 2061 2062class AsyncMagicMixin(MagicMixin): 2063 def __init__(self, /, *args, **kw): 2064 self._mock_set_magics() # make magic work for kwargs in init 2065 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw) 2066 self._mock_set_magics() # fix magic broken by upper level init 2067 2068class MagicMock(MagicMixin, Mock): 2069 """ 2070 MagicMock is a subclass of Mock with default implementations 2071 of most of the magic methods. You can use MagicMock without having to 2072 configure the magic methods yourself. 2073 2074 If you use the `spec` or `spec_set` arguments then *only* magic 2075 methods that exist in the spec will be created. 2076 2077 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 2078 """ 2079 def mock_add_spec(self, spec, spec_set=False): 2080 """Add a spec to a mock. `spec` can either be an object or a 2081 list of strings. Only attributes on the `spec` can be fetched as 2082 attributes from the mock. 2083 2084 If `spec_set` is True then only attributes on the spec can be set.""" 2085 self._mock_add_spec(spec, spec_set) 2086 self._mock_set_magics() 2087 2088 2089 2090class MagicProxy(Base): 2091 def __init__(self, name, parent): 2092 self.name = name 2093 self.parent = parent 2094 2095 def create_mock(self): 2096 entry = self.name 2097 parent = self.parent 2098 m = parent._get_child_mock(name=entry, _new_name=entry, 2099 _new_parent=parent) 2100 setattr(parent, entry, m) 2101 _set_return_value(parent, m, entry) 2102 return m 2103 2104 def __get__(self, obj, _type=None): 2105 return self.create_mock() 2106 2107 2108class AsyncMockMixin(Base): 2109 await_count = _delegating_property('await_count') 2110 await_args = _delegating_property('await_args') 2111 await_args_list = _delegating_property('await_args_list') 2112 2113 def __init__(self, /, *args, **kwargs): 2114 super().__init__(*args, **kwargs) 2115 # asyncio.iscoroutinefunction() checks _is_coroutine property to say if an 2116 # object is a coroutine. Without this check it looks to see if it is a 2117 # function/method, which in this case it is not (since it is an 2118 # AsyncMock). 2119 # It is set through __dict__ because when spec_set is True, this 2120 # attribute is likely undefined. 2121 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 2122 self.__dict__['_mock_await_count'] = 0 2123 self.__dict__['_mock_await_args'] = None 2124 self.__dict__['_mock_await_args_list'] = _CallList() 2125 code_mock = NonCallableMock(spec_set=CodeType) 2126 code_mock.co_flags = inspect.CO_COROUTINE 2127 self.__dict__['__code__'] = code_mock 2128 2129 async def _execute_mock_call(self, /, *args, **kwargs): 2130 # This is nearly just like super(), except for sepcial handling 2131 # of coroutines 2132 2133 _call = self.call_args 2134 self.await_count += 1 2135 self.await_args = _call 2136 self.await_args_list.append(_call) 2137 2138 effect = self.side_effect 2139 if effect is not None: 2140 if _is_exception(effect): 2141 raise effect 2142 elif not _callable(effect): 2143 try: 2144 result = next(effect) 2145 except StopIteration: 2146 # It is impossible to propogate a StopIteration 2147 # through coroutines because of PEP 479 2148 raise StopAsyncIteration 2149 if _is_exception(result): 2150 raise result 2151 elif asyncio.iscoroutinefunction(effect): 2152 result = await effect(*args, **kwargs) 2153 else: 2154 result = effect(*args, **kwargs) 2155 2156 if result is not DEFAULT: 2157 return result 2158 2159 if self._mock_return_value is not DEFAULT: 2160 return self.return_value 2161 2162 if self._mock_wraps is not None: 2163 if asyncio.iscoroutinefunction(self._mock_wraps): 2164 return await self._mock_wraps(*args, **kwargs) 2165 return self._mock_wraps(*args, **kwargs) 2166 2167 return self.return_value 2168 2169 def assert_awaited(self): 2170 """ 2171 Assert that the mock was awaited at least once. 2172 """ 2173 if self.await_count == 0: 2174 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 2175 raise AssertionError(msg) 2176 2177 def assert_awaited_once(self): 2178 """ 2179 Assert that the mock was awaited exactly once. 2180 """ 2181 if not self.await_count == 1: 2182 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2183 f" Awaited {self.await_count} times.") 2184 raise AssertionError(msg) 2185 2186 def assert_awaited_with(self, /, *args, **kwargs): 2187 """ 2188 Assert that the last await was with the specified arguments. 2189 """ 2190 if self.await_args is None: 2191 expected = self._format_mock_call_signature(args, kwargs) 2192 raise AssertionError(f'Expected await: {expected}\nNot awaited') 2193 2194 def _error_message(): 2195 msg = self._format_mock_failure_message(args, kwargs, action='await') 2196 return msg 2197 2198 expected = self._call_matcher((args, kwargs)) 2199 actual = self._call_matcher(self.await_args) 2200 if expected != actual: 2201 cause = expected if isinstance(expected, Exception) else None 2202 raise AssertionError(_error_message()) from cause 2203 2204 def assert_awaited_once_with(self, /, *args, **kwargs): 2205 """ 2206 Assert that the mock was awaited exactly once and with the specified 2207 arguments. 2208 """ 2209 if not self.await_count == 1: 2210 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2211 f" Awaited {self.await_count} times.") 2212 raise AssertionError(msg) 2213 return self.assert_awaited_with(*args, **kwargs) 2214 2215 def assert_any_await(self, /, *args, **kwargs): 2216 """ 2217 Assert the mock has ever been awaited with the specified arguments. 2218 """ 2219 expected = self._call_matcher((args, kwargs)) 2220 actual = [self._call_matcher(c) for c in self.await_args_list] 2221 if expected not in actual: 2222 cause = expected if isinstance(expected, Exception) else None 2223 expected_string = self._format_mock_call_signature(args, kwargs) 2224 raise AssertionError( 2225 '%s await not found' % expected_string 2226 ) from cause 2227 2228 def assert_has_awaits(self, calls, any_order=False): 2229 """ 2230 Assert the mock has been awaited with the specified calls. 2231 The :attr:`await_args_list` list is checked for the awaits. 2232 2233 If `any_order` is False (the default) then the awaits must be 2234 sequential. There can be extra calls before or after the 2235 specified awaits. 2236 2237 If `any_order` is True then the awaits can be in any order, but 2238 they must all appear in :attr:`await_args_list`. 2239 """ 2240 expected = [self._call_matcher(c) for c in calls] 2241 cause = next((e for e in expected if isinstance(e, Exception)), None) 2242 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 2243 if not any_order: 2244 if expected not in all_awaits: 2245 if cause is None: 2246 problem = 'Awaits not found.' 2247 else: 2248 problem = ('Error processing expected awaits.\n' 2249 'Errors: {}').format( 2250 [e if isinstance(e, Exception) else None 2251 for e in expected]) 2252 raise AssertionError( 2253 f'{problem}\n' 2254 f'Expected: {_CallList(calls)}\n' 2255 f'Actual: {self.await_args_list}' 2256 ) from cause 2257 return 2258 2259 all_awaits = list(all_awaits) 2260 2261 not_found = [] 2262 for kall in expected: 2263 try: 2264 all_awaits.remove(kall) 2265 except ValueError: 2266 not_found.append(kall) 2267 if not_found: 2268 raise AssertionError( 2269 '%r not all found in await list' % (tuple(not_found),) 2270 ) from cause 2271 2272 def assert_not_awaited(self): 2273 """ 2274 Assert that the mock was never awaited. 2275 """ 2276 if self.await_count != 0: 2277 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 2278 f" Awaited {self.await_count} times.") 2279 raise AssertionError(msg) 2280 2281 def reset_mock(self, /, *args, **kwargs): 2282 """ 2283 See :func:`.Mock.reset_mock()` 2284 """ 2285 super().reset_mock(*args, **kwargs) 2286 self.await_count = 0 2287 self.await_args = None 2288 self.await_args_list = _CallList() 2289 2290 2291class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 2292 """ 2293 Enhance :class:`Mock` with features allowing to mock 2294 an async function. 2295 2296 The :class:`AsyncMock` object will behave so the object is 2297 recognized as an async function, and the result of a call is an awaitable: 2298 2299 >>> mock = AsyncMock() 2300 >>> asyncio.iscoroutinefunction(mock) 2301 True 2302 >>> inspect.isawaitable(mock()) 2303 True 2304 2305 2306 The result of ``mock()`` is an async function which will have the outcome 2307 of ``side_effect`` or ``return_value``: 2308 2309 - if ``side_effect`` is a function, the async function will return the 2310 result of that function, 2311 - if ``side_effect`` is an exception, the async function will raise the 2312 exception, 2313 - if ``side_effect`` is an iterable, the async function will return the 2314 next value of the iterable, however, if the sequence of result is 2315 exhausted, ``StopIteration`` is raised immediately, 2316 - if ``side_effect`` is not defined, the async function will return the 2317 value defined by ``return_value``, hence, by default, the async function 2318 returns a new :class:`AsyncMock` object. 2319 2320 If the outcome of ``side_effect`` or ``return_value`` is an async function, 2321 the mock async function obtained when the mock object is called will be this 2322 async function itself (and not an async function returning an async 2323 function). 2324 2325 The test author can also specify a wrapped object with ``wraps``. In this 2326 case, the :class:`Mock` object behavior is the same as with an 2327 :class:`.Mock` object: the wrapped object may have methods 2328 defined as async function functions. 2329 2330 Based on Martin Richard's asynctest project. 2331 """ 2332 2333 2334class _ANY(object): 2335 "A helper object that compares equal to everything." 2336 2337 def __eq__(self, other): 2338 return True 2339 2340 def __ne__(self, other): 2341 return False 2342 2343 def __repr__(self): 2344 return '<ANY>' 2345 2346ANY = _ANY() 2347 2348 2349 2350def _format_call_signature(name, args, kwargs): 2351 message = '%s(%%s)' % name 2352 formatted_args = '' 2353 args_string = ', '.join([repr(arg) for arg in args]) 2354 kwargs_string = ', '.join([ 2355 '%s=%r' % (key, value) for key, value in kwargs.items() 2356 ]) 2357 if args_string: 2358 formatted_args = args_string 2359 if kwargs_string: 2360 if formatted_args: 2361 formatted_args += ', ' 2362 formatted_args += kwargs_string 2363 2364 return message % formatted_args 2365 2366 2367 2368class _Call(tuple): 2369 """ 2370 A tuple for holding the results of a call to a mock, either in the form 2371 `(args, kwargs)` or `(name, args, kwargs)`. 2372 2373 If args or kwargs are empty then a call tuple will compare equal to 2374 a tuple without those values. This makes comparisons less verbose:: 2375 2376 _Call(('name', (), {})) == ('name',) 2377 _Call(('name', (1,), {})) == ('name', (1,)) 2378 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 2379 2380 The `_Call` object provides a useful shortcut for comparing with call:: 2381 2382 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 2383 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 2384 2385 If the _Call has no name then it will match any name. 2386 """ 2387 def __new__(cls, value=(), name='', parent=None, two=False, 2388 from_kall=True): 2389 args = () 2390 kwargs = {} 2391 _len = len(value) 2392 if _len == 3: 2393 name, args, kwargs = value 2394 elif _len == 2: 2395 first, second = value 2396 if isinstance(first, str): 2397 name = first 2398 if isinstance(second, tuple): 2399 args = second 2400 else: 2401 kwargs = second 2402 else: 2403 args, kwargs = first, second 2404 elif _len == 1: 2405 value, = value 2406 if isinstance(value, str): 2407 name = value 2408 elif isinstance(value, tuple): 2409 args = value 2410 else: 2411 kwargs = value 2412 2413 if two: 2414 return tuple.__new__(cls, (args, kwargs)) 2415 2416 return tuple.__new__(cls, (name, args, kwargs)) 2417 2418 2419 def __init__(self, value=(), name=None, parent=None, two=False, 2420 from_kall=True): 2421 self._mock_name = name 2422 self._mock_parent = parent 2423 self._mock_from_kall = from_kall 2424 2425 2426 def __eq__(self, other): 2427 if other is ANY: 2428 return True 2429 try: 2430 len_other = len(other) 2431 except TypeError: 2432 return False 2433 2434 self_name = '' 2435 if len(self) == 2: 2436 self_args, self_kwargs = self 2437 else: 2438 self_name, self_args, self_kwargs = self 2439 2440 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 2441 and self._mock_parent != other._mock_parent): 2442 return False 2443 2444 other_name = '' 2445 if len_other == 0: 2446 other_args, other_kwargs = (), {} 2447 elif len_other == 3: 2448 other_name, other_args, other_kwargs = other 2449 elif len_other == 1: 2450 value, = other 2451 if isinstance(value, tuple): 2452 other_args = value 2453 other_kwargs = {} 2454 elif isinstance(value, str): 2455 other_name = value 2456 other_args, other_kwargs = (), {} 2457 else: 2458 other_args = () 2459 other_kwargs = value 2460 elif len_other == 2: 2461 # could be (name, args) or (name, kwargs) or (args, kwargs) 2462 first, second = other 2463 if isinstance(first, str): 2464 other_name = first 2465 if isinstance(second, tuple): 2466 other_args, other_kwargs = second, {} 2467 else: 2468 other_args, other_kwargs = (), second 2469 else: 2470 other_args, other_kwargs = first, second 2471 else: 2472 return False 2473 2474 if self_name and other_name != self_name: 2475 return False 2476 2477 # this order is important for ANY to work! 2478 return (other_args, other_kwargs) == (self_args, self_kwargs) 2479 2480 2481 __ne__ = object.__ne__ 2482 2483 2484 def __call__(self, /, *args, **kwargs): 2485 if self._mock_name is None: 2486 return _Call(('', args, kwargs), name='()') 2487 2488 name = self._mock_name + '()' 2489 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 2490 2491 2492 def __getattr__(self, attr): 2493 if self._mock_name is None: 2494 return _Call(name=attr, from_kall=False) 2495 name = '%s.%s' % (self._mock_name, attr) 2496 return _Call(name=name, parent=self, from_kall=False) 2497 2498 2499 def __getattribute__(self, attr): 2500 if attr in tuple.__dict__: 2501 raise AttributeError 2502 return tuple.__getattribute__(self, attr) 2503 2504 2505 def count(self, /, *args, **kwargs): 2506 return self.__getattr__('count')(*args, **kwargs) 2507 2508 def index(self, /, *args, **kwargs): 2509 return self.__getattr__('index')(*args, **kwargs) 2510 2511 def _get_call_arguments(self): 2512 if len(self) == 2: 2513 args, kwargs = self 2514 else: 2515 name, args, kwargs = self 2516 2517 return args, kwargs 2518 2519 @property 2520 def args(self): 2521 return self._get_call_arguments()[0] 2522 2523 @property 2524 def kwargs(self): 2525 return self._get_call_arguments()[1] 2526 2527 def __repr__(self): 2528 if not self._mock_from_kall: 2529 name = self._mock_name or 'call' 2530 if name.startswith('()'): 2531 name = 'call%s' % name 2532 return name 2533 2534 if len(self) == 2: 2535 name = 'call' 2536 args, kwargs = self 2537 else: 2538 name, args, kwargs = self 2539 if not name: 2540 name = 'call' 2541 elif not name.startswith('()'): 2542 name = 'call.%s' % name 2543 else: 2544 name = 'call%s' % name 2545 return _format_call_signature(name, args, kwargs) 2546 2547 2548 def call_list(self): 2549 """For a call object that represents multiple calls, `call_list` 2550 returns a list of all the intermediate calls as well as the 2551 final call.""" 2552 vals = [] 2553 thing = self 2554 while thing is not None: 2555 if thing._mock_from_kall: 2556 vals.append(thing) 2557 thing = thing._mock_parent 2558 return _CallList(reversed(vals)) 2559 2560 2561call = _Call(from_kall=False) 2562 2563 2564def create_autospec(spec, spec_set=False, instance=False, _parent=None, 2565 _name=None, **kwargs): 2566 """Create a mock object using another object as a spec. Attributes on the 2567 mock will use the corresponding attribute on the `spec` object as their 2568 spec. 2569 2570 Functions or methods being mocked will have their arguments checked 2571 to check that they are called with the correct signature. 2572 2573 If `spec_set` is True then attempting to set attributes that don't exist 2574 on the spec object will raise an `AttributeError`. 2575 2576 If a class is used as a spec then the return value of the mock (the 2577 instance of the class) will have the same spec. You can use a class as the 2578 spec for an instance object by passing `instance=True`. The returned mock 2579 will only be callable if instances of the mock are callable. 2580 2581 `create_autospec` also takes arbitrary keyword arguments that are passed to 2582 the constructor of the created mock.""" 2583 if _is_list(spec): 2584 # can't pass a list instance to the mock constructor as it will be 2585 # interpreted as a list of strings 2586 spec = type(spec) 2587 2588 is_type = isinstance(spec, type) 2589 is_async_func = _is_async_func(spec) 2590 _kwargs = {'spec': spec} 2591 if spec_set: 2592 _kwargs = {'spec_set': spec} 2593 elif spec is None: 2594 # None we mock with a normal mock without a spec 2595 _kwargs = {} 2596 if _kwargs and instance: 2597 _kwargs['_spec_as_instance'] = True 2598 2599 _kwargs.update(kwargs) 2600 2601 Klass = MagicMock 2602 if inspect.isdatadescriptor(spec): 2603 # descriptors don't have a spec 2604 # because we don't know what type they return 2605 _kwargs = {} 2606 elif is_async_func: 2607 if instance: 2608 raise RuntimeError("Instance can not be True when create_autospec " 2609 "is mocking an async function") 2610 Klass = AsyncMock 2611 elif not _callable(spec): 2612 Klass = NonCallableMagicMock 2613 elif is_type and instance and not _instance_callable(spec): 2614 Klass = NonCallableMagicMock 2615 2616 _name = _kwargs.pop('name', _name) 2617 2618 _new_name = _name 2619 if _parent is None: 2620 # for a top level object no _new_name should be set 2621 _new_name = '' 2622 2623 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 2624 name=_name, **_kwargs) 2625 2626 if isinstance(spec, FunctionTypes): 2627 # should only happen at the top level because we don't 2628 # recurse for functions 2629 mock = _set_signature(mock, spec) 2630 if is_async_func: 2631 _setup_async_mock(mock) 2632 else: 2633 _check_signature(spec, mock, is_type, instance) 2634 2635 if _parent is not None and not instance: 2636 _parent._mock_children[_name] = mock 2637 2638 if is_type and not instance and 'return_value' not in kwargs: 2639 mock.return_value = create_autospec(spec, spec_set, instance=True, 2640 _name='()', _parent=mock) 2641 2642 for entry in dir(spec): 2643 if _is_magic(entry): 2644 # MagicMock already does the useful magic methods for us 2645 continue 2646 2647 # XXXX do we need a better way of getting attributes without 2648 # triggering code execution (?) Probably not - we need the actual 2649 # object to mock it so we would rather trigger a property than mock 2650 # the property descriptor. Likewise we want to mock out dynamically 2651 # provided attributes. 2652 # XXXX what about attributes that raise exceptions other than 2653 # AttributeError on being fetched? 2654 # we could be resilient against it, or catch and propagate the 2655 # exception when the attribute is fetched from the mock 2656 try: 2657 original = getattr(spec, entry) 2658 except AttributeError: 2659 continue 2660 2661 kwargs = {'spec': original} 2662 if spec_set: 2663 kwargs = {'spec_set': original} 2664 2665 if not isinstance(original, FunctionTypes): 2666 new = _SpecState(original, spec_set, mock, entry, instance) 2667 mock._mock_children[entry] = new 2668 else: 2669 parent = mock 2670 if isinstance(spec, FunctionTypes): 2671 parent = mock.mock 2672 2673 skipfirst = _must_skip(spec, entry, is_type) 2674 kwargs['_eat_self'] = skipfirst 2675 if asyncio.iscoroutinefunction(original): 2676 child_klass = AsyncMock 2677 else: 2678 child_klass = MagicMock 2679 new = child_klass(parent=parent, name=entry, _new_name=entry, 2680 _new_parent=parent, 2681 **kwargs) 2682 mock._mock_children[entry] = new 2683 _check_signature(original, new, skipfirst=skipfirst) 2684 2685 # so functions created with _set_signature become instance attributes, 2686 # *plus* their underlying mock exists in _mock_children of the parent 2687 # mock. Adding to _mock_children may be unnecessary where we are also 2688 # setting as an instance attribute? 2689 if isinstance(new, FunctionTypes): 2690 setattr(mock, entry, new) 2691 2692 return mock 2693 2694 2695def _must_skip(spec, entry, is_type): 2696 """ 2697 Return whether we should skip the first argument on spec's `entry` 2698 attribute. 2699 """ 2700 if not isinstance(spec, type): 2701 if entry in getattr(spec, '__dict__', {}): 2702 # instance attribute - shouldn't skip 2703 return False 2704 spec = spec.__class__ 2705 2706 for klass in spec.__mro__: 2707 result = klass.__dict__.get(entry, DEFAULT) 2708 if result is DEFAULT: 2709 continue 2710 if isinstance(result, (staticmethod, classmethod)): 2711 return False 2712 elif isinstance(getattr(result, '__get__', None), MethodWrapperTypes): 2713 # Normal method => skip if looked up on type 2714 # (if looked up on instance, self is already skipped) 2715 return is_type 2716 else: 2717 return False 2718 2719 # function is a dynamically provided attribute 2720 return is_type 2721 2722 2723class _SpecState(object): 2724 2725 def __init__(self, spec, spec_set=False, parent=None, 2726 name=None, ids=None, instance=False): 2727 self.spec = spec 2728 self.ids = ids 2729 self.spec_set = spec_set 2730 self.parent = parent 2731 self.instance = instance 2732 self.name = name 2733 2734 2735FunctionTypes = ( 2736 # python function 2737 type(create_autospec), 2738 # instance method 2739 type(ANY.__eq__), 2740) 2741 2742MethodWrapperTypes = ( 2743 type(ANY.__eq__.__get__), 2744) 2745 2746 2747file_spec = None 2748 2749 2750def _to_stream(read_data): 2751 if isinstance(read_data, bytes): 2752 return io.BytesIO(read_data) 2753 else: 2754 return io.StringIO(read_data) 2755 2756 2757def mock_open(mock=None, read_data=''): 2758 """ 2759 A helper function to create a mock to replace the use of `open`. It works 2760 for `open` called directly or used as a context manager. 2761 2762 The `mock` argument is the mock object to configure. If `None` (the 2763 default) then a `MagicMock` will be created for you, with the API limited 2764 to methods or attributes available on standard file handles. 2765 2766 `read_data` is a string for the `read`, `readline` and `readlines` of the 2767 file handle to return. This is an empty string by default. 2768 """ 2769 _read_data = _to_stream(read_data) 2770 _state = [_read_data, None] 2771 2772 def _readlines_side_effect(*args, **kwargs): 2773 if handle.readlines.return_value is not None: 2774 return handle.readlines.return_value 2775 return _state[0].readlines(*args, **kwargs) 2776 2777 def _read_side_effect(*args, **kwargs): 2778 if handle.read.return_value is not None: 2779 return handle.read.return_value 2780 return _state[0].read(*args, **kwargs) 2781 2782 def _readline_side_effect(*args, **kwargs): 2783 yield from _iter_side_effect() 2784 while True: 2785 yield _state[0].readline(*args, **kwargs) 2786 2787 def _iter_side_effect(): 2788 if handle.readline.return_value is not None: 2789 while True: 2790 yield handle.readline.return_value 2791 for line in _state[0]: 2792 yield line 2793 2794 def _next_side_effect(): 2795 if handle.readline.return_value is not None: 2796 return handle.readline.return_value 2797 return next(_state[0]) 2798 2799 global file_spec 2800 if file_spec is None: 2801 import _io 2802 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 2803 2804 if mock is None: 2805 mock = MagicMock(name='open', spec=open) 2806 2807 handle = MagicMock(spec=file_spec) 2808 handle.__enter__.return_value = handle 2809 2810 handle.write.return_value = None 2811 handle.read.return_value = None 2812 handle.readline.return_value = None 2813 handle.readlines.return_value = None 2814 2815 handle.read.side_effect = _read_side_effect 2816 _state[1] = _readline_side_effect() 2817 handle.readline.side_effect = _state[1] 2818 handle.readlines.side_effect = _readlines_side_effect 2819 handle.__iter__.side_effect = _iter_side_effect 2820 handle.__next__.side_effect = _next_side_effect 2821 2822 def reset_data(*args, **kwargs): 2823 _state[0] = _to_stream(read_data) 2824 if handle.readline.side_effect == _state[1]: 2825 # Only reset the side effect if the user hasn't overridden it. 2826 _state[1] = _readline_side_effect() 2827 handle.readline.side_effect = _state[1] 2828 return DEFAULT 2829 2830 mock.side_effect = reset_data 2831 mock.return_value = handle 2832 return mock 2833 2834 2835class PropertyMock(Mock): 2836 """ 2837 A mock intended to be used as a property, or other descriptor, on a class. 2838 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 2839 a return value when it is fetched. 2840 2841 Fetching a `PropertyMock` instance from an object calls the mock, with 2842 no args. Setting it calls the mock with the value being set. 2843 """ 2844 def _get_child_mock(self, /, **kwargs): 2845 return MagicMock(**kwargs) 2846 2847 def __get__(self, obj, obj_type=None): 2848 return self() 2849 def __set__(self, obj, val): 2850 self(val) 2851 2852 2853def seal(mock): 2854 """Disable the automatic generation of child mocks. 2855 2856 Given an input Mock, seals it to ensure no further mocks will be generated 2857 when accessing an attribute that was not already defined. 2858 2859 The operation recursively seals the mock passed in, meaning that 2860 the mock itself, any mocks generated by accessing one of its attributes, 2861 and all assigned mocks without a name or spec will be sealed. 2862 """ 2863 mock._mock_sealed = True 2864 for attr in dir(mock): 2865 try: 2866 m = getattr(mock, attr) 2867 except AttributeError: 2868 continue 2869 if not isinstance(m, NonCallableMock): 2870 continue 2871 if m._mock_new_parent is mock: 2872 seal(m) 2873 2874 2875class _AsyncIterator: 2876 """ 2877 Wraps an iterator in an asynchronous iterator. 2878 """ 2879 def __init__(self, iterator): 2880 self.iterator = iterator 2881 code_mock = NonCallableMock(spec_set=CodeType) 2882 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 2883 self.__dict__['__code__'] = code_mock 2884 2885 def __aiter__(self): 2886 return self 2887 2888 async def __anext__(self): 2889 try: 2890 return next(self.iterator) 2891 except StopIteration: 2892 pass 2893 raise StopAsyncIteration 2894