• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
6
7__all__ = (
8    'Mock',
9    'MagicMock',
10    'patch',
11    'sentinel',
12    'DEFAULT',
13    'ANY',
14    'call',
15    'create_autospec',
16    'AsyncMock',
17    'FILTER_DIR',
18    'NonCallableMock',
19    'NonCallableMagicMock',
20    'mock_open',
21    'PropertyMock',
22    'seal',
23)
24
25
26__version__ = '1.0'
27
28import asyncio
29import contextlib
30import io
31import inspect
32import pprint
33import sys
34import builtins
35from types import CodeType, ModuleType, MethodType
36from unittest.util import safe_repr
37from functools import wraps, partial
38
39
40_builtins = {name for name in dir(builtins) if not name.startswith('_')}
41
42FILTER_DIR = True
43
44# Workaround for issue #12370
45# Without this, the __class__ properties wouldn't be set correctly
46_safe_super = super
47
48def _is_async_obj(obj):
49    if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
50        return False
51    return asyncio.iscoroutinefunction(obj) or inspect.isawaitable(obj)
52
53
54def _is_async_func(func):
55    if getattr(func, '__code__', None):
56        return asyncio.iscoroutinefunction(func)
57    else:
58        return False
59
60
61def _is_instance_mock(obj):
62    # can't use isinstance on Mock objects because they override __class__
63    # The base class for all mocks is NonCallableMock
64    return issubclass(type(obj), NonCallableMock)
65
66
67def _is_exception(obj):
68    return (
69        isinstance(obj, BaseException) or
70        isinstance(obj, type) and issubclass(obj, BaseException)
71    )
72
73
74def _extract_mock(obj):
75    # Autospecced functions will return a FunctionType with "mock" attribute
76    # which is the actual mock object that needs to be used.
77    if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
78        return obj.mock
79    else:
80        return obj
81
82
83def _get_signature_object(func, as_instance, eat_self):
84    """
85    Given an arbitrary, possibly callable object, try to create a suitable
86    signature object.
87    Return a (reduced func, signature) tuple, or None.
88    """
89    if isinstance(func, type) and not as_instance:
90        # If it's a type and should be modelled as a type, use __init__.
91        func = func.__init__
92        # Skip the `self` argument in __init__
93        eat_self = True
94    elif not isinstance(func, FunctionTypes):
95        # If we really want to model an instance of the passed type,
96        # __call__ should be looked up, not __init__.
97        try:
98            func = func.__call__
99        except AttributeError:
100            return None
101    if eat_self:
102        sig_func = partial(func, None)
103    else:
104        sig_func = func
105    try:
106        return func, inspect.signature(sig_func)
107    except ValueError:
108        # Certain callable types are not supported by inspect.signature()
109        return None
110
111
112def _check_signature(func, mock, skipfirst, instance=False):
113    sig = _get_signature_object(func, instance, skipfirst)
114    if sig is None:
115        return
116    func, sig = sig
117    def checksig(self, /, *args, **kwargs):
118        sig.bind(*args, **kwargs)
119    _copy_func_details(func, checksig)
120    type(mock)._mock_check_sig = checksig
121    type(mock).__signature__ = sig
122
123
124def _copy_func_details(func, funcopy):
125    # we explicitly don't copy func.__dict__ into this copy as it would
126    # expose original attributes that should be mocked
127    for attribute in (
128        '__name__', '__doc__', '__text_signature__',
129        '__module__', '__defaults__', '__kwdefaults__',
130    ):
131        try:
132            setattr(funcopy, attribute, getattr(func, attribute))
133        except AttributeError:
134            pass
135
136
137def _callable(obj):
138    if isinstance(obj, type):
139        return True
140    if isinstance(obj, (staticmethod, classmethod, MethodType)):
141        return _callable(obj.__func__)
142    if getattr(obj, '__call__', None) is not None:
143        return True
144    return False
145
146
147def _is_list(obj):
148    # checks for list or tuples
149    # XXXX badly named!
150    return type(obj) in (list, tuple)
151
152
153def _instance_callable(obj):
154    """Given an object, return True if the object is callable.
155    For classes, return True if instances would be callable."""
156    if not isinstance(obj, type):
157        # already an instance
158        return getattr(obj, '__call__', None) is not None
159
160    # *could* be broken by a class overriding __mro__ or __dict__ via
161    # a metaclass
162    for base in (obj,) + obj.__mro__:
163        if base.__dict__.get('__call__') is not None:
164            return True
165    return False
166
167
168def _set_signature(mock, original, instance=False):
169    # creates a function with signature (*args, **kwargs) that delegates to a
170    # mock. It still does signature checking by calling a lambda with the same
171    # signature as the original.
172
173    skipfirst = isinstance(original, type)
174    result = _get_signature_object(original, instance, skipfirst)
175    if result is None:
176        return mock
177    func, sig = result
178    def checksig(*args, **kwargs):
179        sig.bind(*args, **kwargs)
180    _copy_func_details(func, checksig)
181
182    name = original.__name__
183    if not name.isidentifier():
184        name = 'funcopy'
185    context = {'_checksig_': checksig, 'mock': mock}
186    src = """def %s(*args, **kwargs):
187    _checksig_(*args, **kwargs)
188    return mock(*args, **kwargs)""" % name
189    exec (src, context)
190    funcopy = context[name]
191    _setup_func(funcopy, mock, sig)
192    return funcopy
193
194
195def _setup_func(funcopy, mock, sig):
196    funcopy.mock = mock
197
198    def assert_called_with(*args, **kwargs):
199        return mock.assert_called_with(*args, **kwargs)
200    def assert_called(*args, **kwargs):
201        return mock.assert_called(*args, **kwargs)
202    def assert_not_called(*args, **kwargs):
203        return mock.assert_not_called(*args, **kwargs)
204    def assert_called_once(*args, **kwargs):
205        return mock.assert_called_once(*args, **kwargs)
206    def assert_called_once_with(*args, **kwargs):
207        return mock.assert_called_once_with(*args, **kwargs)
208    def assert_has_calls(*args, **kwargs):
209        return mock.assert_has_calls(*args, **kwargs)
210    def assert_any_call(*args, **kwargs):
211        return mock.assert_any_call(*args, **kwargs)
212    def reset_mock():
213        funcopy.method_calls = _CallList()
214        funcopy.mock_calls = _CallList()
215        mock.reset_mock()
216        ret = funcopy.return_value
217        if _is_instance_mock(ret) and not ret is mock:
218            ret.reset_mock()
219
220    funcopy.called = False
221    funcopy.call_count = 0
222    funcopy.call_args = None
223    funcopy.call_args_list = _CallList()
224    funcopy.method_calls = _CallList()
225    funcopy.mock_calls = _CallList()
226
227    funcopy.return_value = mock.return_value
228    funcopy.side_effect = mock.side_effect
229    funcopy._mock_children = mock._mock_children
230
231    funcopy.assert_called_with = assert_called_with
232    funcopy.assert_called_once_with = assert_called_once_with
233    funcopy.assert_has_calls = assert_has_calls
234    funcopy.assert_any_call = assert_any_call
235    funcopy.reset_mock = reset_mock
236    funcopy.assert_called = assert_called
237    funcopy.assert_not_called = assert_not_called
238    funcopy.assert_called_once = assert_called_once
239    funcopy.__signature__ = sig
240
241    mock._mock_delegate = funcopy
242
243
244def _setup_async_mock(mock):
245    mock._is_coroutine = asyncio.coroutines._is_coroutine
246    mock.await_count = 0
247    mock.await_args = None
248    mock.await_args_list = _CallList()
249
250    # Mock is not configured yet so the attributes are set
251    # to a function and then the corresponding mock helper function
252    # is called when the helper is accessed similar to _setup_func.
253    def wrapper(attr, /, *args, **kwargs):
254        return getattr(mock.mock, attr)(*args, **kwargs)
255
256    for attribute in ('assert_awaited',
257                      'assert_awaited_once',
258                      'assert_awaited_with',
259                      'assert_awaited_once_with',
260                      'assert_any_await',
261                      'assert_has_awaits',
262                      'assert_not_awaited'):
263
264        # setattr(mock, attribute, wrapper) causes late binding
265        # hence attribute will always be the last value in the loop
266        # Use partial(wrapper, attribute) to ensure the attribute is bound
267        # correctly.
268        setattr(mock, attribute, partial(wrapper, attribute))
269
270
271def _is_magic(name):
272    return '__%s__' % name[2:-2] == name
273
274
275class _SentinelObject(object):
276    "A unique, named, sentinel object."
277    def __init__(self, name):
278        self.name = name
279
280    def __repr__(self):
281        return 'sentinel.%s' % self.name
282
283    def __reduce__(self):
284        return 'sentinel.%s' % self.name
285
286
287class _Sentinel(object):
288    """Access attributes to return a named object, usable as a sentinel."""
289    def __init__(self):
290        self._sentinels = {}
291
292    def __getattr__(self, name):
293        if name == '__bases__':
294            # Without this help(unittest.mock) raises an exception
295            raise AttributeError
296        return self._sentinels.setdefault(name, _SentinelObject(name))
297
298    def __reduce__(self):
299        return 'sentinel'
300
301
302sentinel = _Sentinel()
303
304DEFAULT = sentinel.DEFAULT
305_missing = sentinel.MISSING
306_deleted = sentinel.DELETED
307
308
309_allowed_names = {
310    'return_value', '_mock_return_value', 'side_effect',
311    '_mock_side_effect', '_mock_parent', '_mock_new_parent',
312    '_mock_name', '_mock_new_name'
313}
314
315
316def _delegating_property(name):
317    _allowed_names.add(name)
318    _the_name = '_mock_' + name
319    def _get(self, name=name, _the_name=_the_name):
320        sig = self._mock_delegate
321        if sig is None:
322            return getattr(self, _the_name)
323        return getattr(sig, name)
324    def _set(self, value, name=name, _the_name=_the_name):
325        sig = self._mock_delegate
326        if sig is None:
327            self.__dict__[_the_name] = value
328        else:
329            setattr(sig, name, value)
330
331    return property(_get, _set)
332
333
334
335class _CallList(list):
336
337    def __contains__(self, value):
338        if not isinstance(value, list):
339            return list.__contains__(self, value)
340        len_value = len(value)
341        len_self = len(self)
342        if len_value > len_self:
343            return False
344
345        for i in range(0, len_self - len_value + 1):
346            sub_list = self[i:i+len_value]
347            if sub_list == value:
348                return True
349        return False
350
351    def __repr__(self):
352        return pprint.pformat(list(self))
353
354
355def _check_and_set_parent(parent, value, name, new_name):
356    value = _extract_mock(value)
357
358    if not _is_instance_mock(value):
359        return False
360    if ((value._mock_name or value._mock_new_name) or
361        (value._mock_parent is not None) or
362        (value._mock_new_parent is not None)):
363        return False
364
365    _parent = parent
366    while _parent is not None:
367        # setting a mock (value) as a child or return value of itself
368        # should not modify the mock
369        if _parent is value:
370            return False
371        _parent = _parent._mock_new_parent
372
373    if new_name:
374        value._mock_new_parent = parent
375        value._mock_new_name = new_name
376    if name:
377        value._mock_parent = parent
378        value._mock_name = name
379    return True
380
381# Internal class to identify if we wrapped an iterator object or not.
382class _MockIter(object):
383    def __init__(self, obj):
384        self.obj = iter(obj)
385    def __next__(self):
386        return next(self.obj)
387
388class Base(object):
389    _mock_return_value = DEFAULT
390    _mock_side_effect = None
391    def __init__(self, /, *args, **kwargs):
392        pass
393
394
395
396class NonCallableMock(Base):
397    """A non-callable version of `Mock`"""
398
399    def __new__(cls, /, *args, **kw):
400        # every instance has its own class
401        # so we can create magic methods on the
402        # class without stomping on other mocks
403        bases = (cls,)
404        if not issubclass(cls, AsyncMock):
405            # Check if spec is an async object or function
406            sig = inspect.signature(NonCallableMock.__init__)
407            bound_args = sig.bind_partial(cls, *args, **kw).arguments
408            spec_arg = [
409                arg for arg in bound_args.keys()
410                if arg.startswith('spec')
411            ]
412            if spec_arg:
413                # what if spec_set is different than spec?
414                if _is_async_obj(bound_args[spec_arg[0]]):
415                    bases = (AsyncMockMixin, cls,)
416        new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
417        instance = _safe_super(NonCallableMock, cls).__new__(new)
418        return instance
419
420
421    def __init__(
422            self, spec=None, wraps=None, name=None, spec_set=None,
423            parent=None, _spec_state=None, _new_name='', _new_parent=None,
424            _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
425        ):
426        if _new_parent is None:
427            _new_parent = parent
428
429        __dict__ = self.__dict__
430        __dict__['_mock_parent'] = parent
431        __dict__['_mock_name'] = name
432        __dict__['_mock_new_name'] = _new_name
433        __dict__['_mock_new_parent'] = _new_parent
434        __dict__['_mock_sealed'] = False
435
436        if spec_set is not None:
437            spec = spec_set
438            spec_set = True
439        if _eat_self is None:
440            _eat_self = parent is not None
441
442        self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
443
444        __dict__['_mock_children'] = {}
445        __dict__['_mock_wraps'] = wraps
446        __dict__['_mock_delegate'] = None
447
448        __dict__['_mock_called'] = False
449        __dict__['_mock_call_args'] = None
450        __dict__['_mock_call_count'] = 0
451        __dict__['_mock_call_args_list'] = _CallList()
452        __dict__['_mock_mock_calls'] = _CallList()
453
454        __dict__['method_calls'] = _CallList()
455        __dict__['_mock_unsafe'] = unsafe
456
457        if kwargs:
458            self.configure_mock(**kwargs)
459
460        _safe_super(NonCallableMock, self).__init__(
461            spec, wraps, name, spec_set, parent,
462            _spec_state
463        )
464
465
466    def attach_mock(self, mock, attribute):
467        """
468        Attach a mock as an attribute of this one, replacing its name and
469        parent. Calls to the attached mock will be recorded in the
470        `method_calls` and `mock_calls` attributes of this one."""
471        inner_mock = _extract_mock(mock)
472
473        inner_mock._mock_parent = None
474        inner_mock._mock_new_parent = None
475        inner_mock._mock_name = ''
476        inner_mock._mock_new_name = None
477
478        setattr(self, attribute, mock)
479
480
481    def mock_add_spec(self, spec, spec_set=False):
482        """Add a spec to a mock. `spec` can either be an object or a
483        list of strings. Only attributes on the `spec` can be fetched as
484        attributes from the mock.
485
486        If `spec_set` is True then only attributes on the spec can be set."""
487        self._mock_add_spec(spec, spec_set)
488
489
490    def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
491                       _eat_self=False):
492        _spec_class = None
493        _spec_signature = None
494        _spec_asyncs = []
495
496        for attr in dir(spec):
497            if asyncio.iscoroutinefunction(getattr(spec, attr, None)):
498                _spec_asyncs.append(attr)
499
500        if spec is not None and not _is_list(spec):
501            if isinstance(spec, type):
502                _spec_class = spec
503            else:
504                _spec_class = type(spec)
505            res = _get_signature_object(spec,
506                                        _spec_as_instance, _eat_self)
507            _spec_signature = res and res[1]
508
509            spec = dir(spec)
510
511        __dict__ = self.__dict__
512        __dict__['_spec_class'] = _spec_class
513        __dict__['_spec_set'] = spec_set
514        __dict__['_spec_signature'] = _spec_signature
515        __dict__['_mock_methods'] = spec
516        __dict__['_spec_asyncs'] = _spec_asyncs
517
518    def __get_return_value(self):
519        ret = self._mock_return_value
520        if self._mock_delegate is not None:
521            ret = self._mock_delegate.return_value
522
523        if ret is DEFAULT:
524            ret = self._get_child_mock(
525                _new_parent=self, _new_name='()'
526            )
527            self.return_value = ret
528        return ret
529
530
531    def __set_return_value(self, value):
532        if self._mock_delegate is not None:
533            self._mock_delegate.return_value = value
534        else:
535            self._mock_return_value = value
536            _check_and_set_parent(self, value, None, '()')
537
538    __return_value_doc = "The value to be returned when the mock is called."
539    return_value = property(__get_return_value, __set_return_value,
540                            __return_value_doc)
541
542
543    @property
544    def __class__(self):
545        if self._spec_class is None:
546            return type(self)
547        return self._spec_class
548
549    called = _delegating_property('called')
550    call_count = _delegating_property('call_count')
551    call_args = _delegating_property('call_args')
552    call_args_list = _delegating_property('call_args_list')
553    mock_calls = _delegating_property('mock_calls')
554
555
556    def __get_side_effect(self):
557        delegated = self._mock_delegate
558        if delegated is None:
559            return self._mock_side_effect
560        sf = delegated.side_effect
561        if (sf is not None and not callable(sf)
562                and not isinstance(sf, _MockIter) and not _is_exception(sf)):
563            sf = _MockIter(sf)
564            delegated.side_effect = sf
565        return sf
566
567    def __set_side_effect(self, value):
568        value = _try_iter(value)
569        delegated = self._mock_delegate
570        if delegated is None:
571            self._mock_side_effect = value
572        else:
573            delegated.side_effect = value
574
575    side_effect = property(__get_side_effect, __set_side_effect)
576
577
578    def reset_mock(self,  visited=None,*, return_value=False, side_effect=False):
579        "Restore the mock object to its initial state."
580        if visited is None:
581            visited = []
582        if id(self) in visited:
583            return
584        visited.append(id(self))
585
586        self.called = False
587        self.call_args = None
588        self.call_count = 0
589        self.mock_calls = _CallList()
590        self.call_args_list = _CallList()
591        self.method_calls = _CallList()
592
593        if return_value:
594            self._mock_return_value = DEFAULT
595        if side_effect:
596            self._mock_side_effect = None
597
598        for child in self._mock_children.values():
599            if isinstance(child, _SpecState) or child is _deleted:
600                continue
601            child.reset_mock(visited)
602
603        ret = self._mock_return_value
604        if _is_instance_mock(ret) and ret is not self:
605            ret.reset_mock(visited)
606
607
608    def configure_mock(self, /, **kwargs):
609        """Set attributes on the mock through keyword arguments.
610
611        Attributes plus return values and side effects can be set on child
612        mocks using standard dot notation and unpacking a dictionary in the
613        method call:
614
615        >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
616        >>> mock.configure_mock(**attrs)"""
617        for arg, val in sorted(kwargs.items(),
618                               # we sort on the number of dots so that
619                               # attributes are set before we set attributes on
620                               # attributes
621                               key=lambda entry: entry[0].count('.')):
622            args = arg.split('.')
623            final = args.pop()
624            obj = self
625            for entry in args:
626                obj = getattr(obj, entry)
627            setattr(obj, final, val)
628
629
630    def __getattr__(self, name):
631        if name in {'_mock_methods', '_mock_unsafe'}:
632            raise AttributeError(name)
633        elif self._mock_methods is not None:
634            if name not in self._mock_methods or name in _all_magics:
635                raise AttributeError("Mock object has no attribute %r" % name)
636        elif _is_magic(name):
637            raise AttributeError(name)
638        if not self._mock_unsafe:
639            if name.startswith(('assert', 'assret')):
640                raise AttributeError("Attributes cannot start with 'assert' "
641                                     "or 'assret'")
642
643        result = self._mock_children.get(name)
644        if result is _deleted:
645            raise AttributeError(name)
646        elif result is None:
647            wraps = None
648            if self._mock_wraps is not None:
649                # XXXX should we get the attribute without triggering code
650                # execution?
651                wraps = getattr(self._mock_wraps, name)
652
653            result = self._get_child_mock(
654                parent=self, name=name, wraps=wraps, _new_name=name,
655                _new_parent=self
656            )
657            self._mock_children[name]  = result
658
659        elif isinstance(result, _SpecState):
660            result = create_autospec(
661                result.spec, result.spec_set, result.instance,
662                result.parent, result.name
663            )
664            self._mock_children[name]  = result
665
666        return result
667
668
669    def _extract_mock_name(self):
670        _name_list = [self._mock_new_name]
671        _parent = self._mock_new_parent
672        last = self
673
674        dot = '.'
675        if _name_list == ['()']:
676            dot = ''
677
678        while _parent is not None:
679            last = _parent
680
681            _name_list.append(_parent._mock_new_name + dot)
682            dot = '.'
683            if _parent._mock_new_name == '()':
684                dot = ''
685
686            _parent = _parent._mock_new_parent
687
688        _name_list = list(reversed(_name_list))
689        _first = last._mock_name or 'mock'
690        if len(_name_list) > 1:
691            if _name_list[1] not in ('()', '().'):
692                _first += '.'
693        _name_list[0] = _first
694        return ''.join(_name_list)
695
696    def __repr__(self):
697        name = self._extract_mock_name()
698
699        name_string = ''
700        if name not in ('mock', 'mock.'):
701            name_string = ' name=%r' % name
702
703        spec_string = ''
704        if self._spec_class is not None:
705            spec_string = ' spec=%r'
706            if self._spec_set:
707                spec_string = ' spec_set=%r'
708            spec_string = spec_string % self._spec_class.__name__
709        return "<%s%s%s id='%s'>" % (
710            type(self).__name__,
711            name_string,
712            spec_string,
713            id(self)
714        )
715
716
717    def __dir__(self):
718        """Filter the output of `dir(mock)` to only useful members."""
719        if not FILTER_DIR:
720            return object.__dir__(self)
721
722        extras = self._mock_methods or []
723        from_type = dir(type(self))
724        from_dict = list(self.__dict__)
725        from_child_mocks = [
726            m_name for m_name, m_value in self._mock_children.items()
727            if m_value is not _deleted]
728
729        from_type = [e for e in from_type if not e.startswith('_')]
730        from_dict = [e for e in from_dict if not e.startswith('_') or
731                     _is_magic(e)]
732        return sorted(set(extras + from_type + from_dict + from_child_mocks))
733
734
735    def __setattr__(self, name, value):
736        if name in _allowed_names:
737            # property setters go through here
738            return object.__setattr__(self, name, value)
739        elif (self._spec_set and self._mock_methods is not None and
740            name not in self._mock_methods and
741            name not in self.__dict__):
742            raise AttributeError("Mock object has no attribute '%s'" % name)
743        elif name in _unsupported_magics:
744            msg = 'Attempting to set unsupported magic method %r.' % name
745            raise AttributeError(msg)
746        elif name in _all_magics:
747            if self._mock_methods is not None and name not in self._mock_methods:
748                raise AttributeError("Mock object has no attribute '%s'" % name)
749
750            if not _is_instance_mock(value):
751                setattr(type(self), name, _get_method(name, value))
752                original = value
753                value = lambda *args, **kw: original(self, *args, **kw)
754            else:
755                # only set _new_name and not name so that mock_calls is tracked
756                # but not method calls
757                _check_and_set_parent(self, value, None, name)
758                setattr(type(self), name, value)
759                self._mock_children[name] = value
760        elif name == '__class__':
761            self._spec_class = value
762            return
763        else:
764            if _check_and_set_parent(self, value, name, name):
765                self._mock_children[name] = value
766
767        if self._mock_sealed and not hasattr(self, name):
768            mock_name = f'{self._extract_mock_name()}.{name}'
769            raise AttributeError(f'Cannot set {mock_name}')
770
771        return object.__setattr__(self, name, value)
772
773
774    def __delattr__(self, name):
775        if name in _all_magics and name in type(self).__dict__:
776            delattr(type(self), name)
777            if name not in self.__dict__:
778                # for magic methods that are still MagicProxy objects and
779                # not set on the instance itself
780                return
781
782        obj = self._mock_children.get(name, _missing)
783        if name in self.__dict__:
784            _safe_super(NonCallableMock, self).__delattr__(name)
785        elif obj is _deleted:
786            raise AttributeError(name)
787        if obj is not _missing:
788            del self._mock_children[name]
789        self._mock_children[name] = _deleted
790
791
792    def _format_mock_call_signature(self, args, kwargs):
793        name = self._mock_name or 'mock'
794        return _format_call_signature(name, args, kwargs)
795
796
797    def _format_mock_failure_message(self, args, kwargs, action='call'):
798        message = 'expected %s not found.\nExpected: %s\nActual: %s'
799        expected_string = self._format_mock_call_signature(args, kwargs)
800        call_args = self.call_args
801        actual_string = self._format_mock_call_signature(*call_args)
802        return message % (action, expected_string, actual_string)
803
804
805    def _get_call_signature_from_name(self, name):
806        """
807        * If call objects are asserted against a method/function like obj.meth1
808        then there could be no name for the call object to lookup. Hence just
809        return the spec_signature of the method/function being asserted against.
810        * If the name is not empty then remove () and split by '.' to get
811        list of names to iterate through the children until a potential
812        match is found. A child mock is created only during attribute access
813        so if we get a _SpecState then no attributes of the spec were accessed
814        and can be safely exited.
815        """
816        if not name:
817            return self._spec_signature
818
819        sig = None
820        names = name.replace('()', '').split('.')
821        children = self._mock_children
822
823        for name in names:
824            child = children.get(name)
825            if child is None or isinstance(child, _SpecState):
826                break
827            else:
828                children = child._mock_children
829                sig = child._spec_signature
830
831        return sig
832
833
834    def _call_matcher(self, _call):
835        """
836        Given a call (or simply an (args, kwargs) tuple), return a
837        comparison key suitable for matching with other calls.
838        This is a best effort method which relies on the spec's signature,
839        if available, or falls back on the arguments themselves.
840        """
841
842        if isinstance(_call, tuple) and len(_call) > 2:
843            sig = self._get_call_signature_from_name(_call[0])
844        else:
845            sig = self._spec_signature
846
847        if sig is not None:
848            if len(_call) == 2:
849                name = ''
850                args, kwargs = _call
851            else:
852                name, args, kwargs = _call
853            try:
854                return name, sig.bind(*args, **kwargs)
855            except TypeError as e:
856                return e.with_traceback(None)
857        else:
858            return _call
859
860    def assert_not_called(self):
861        """assert that the mock was never called.
862        """
863        if self.call_count != 0:
864            msg = ("Expected '%s' to not have been called. Called %s times.%s"
865                   % (self._mock_name or 'mock',
866                      self.call_count,
867                      self._calls_repr()))
868            raise AssertionError(msg)
869
870    def assert_called(self):
871        """assert that the mock was called at least once
872        """
873        if self.call_count == 0:
874            msg = ("Expected '%s' to have been called." %
875                   (self._mock_name or 'mock'))
876            raise AssertionError(msg)
877
878    def assert_called_once(self):
879        """assert that the mock was called only once.
880        """
881        if not self.call_count == 1:
882            msg = ("Expected '%s' to have been called once. Called %s times.%s"
883                   % (self._mock_name or 'mock',
884                      self.call_count,
885                      self._calls_repr()))
886            raise AssertionError(msg)
887
888    def assert_called_with(self, /, *args, **kwargs):
889        """assert that the last call was made with the specified arguments.
890
891        Raises an AssertionError if the args and keyword args passed in are
892        different to the last call to the mock."""
893        if self.call_args is None:
894            expected = self._format_mock_call_signature(args, kwargs)
895            actual = 'not called.'
896            error_message = ('expected call not found.\nExpected: %s\nActual: %s'
897                    % (expected, actual))
898            raise AssertionError(error_message)
899
900        def _error_message():
901            msg = self._format_mock_failure_message(args, kwargs)
902            return msg
903        expected = self._call_matcher((args, kwargs))
904        actual = self._call_matcher(self.call_args)
905        if expected != actual:
906            cause = expected if isinstance(expected, Exception) else None
907            raise AssertionError(_error_message()) from cause
908
909
910    def assert_called_once_with(self, /, *args, **kwargs):
911        """assert that the mock was called exactly once and that that call was
912        with the specified arguments."""
913        if not self.call_count == 1:
914            msg = ("Expected '%s' to be called once. Called %s times.%s"
915                   % (self._mock_name or 'mock',
916                      self.call_count,
917                      self._calls_repr()))
918            raise AssertionError(msg)
919        return self.assert_called_with(*args, **kwargs)
920
921
922    def assert_has_calls(self, calls, any_order=False):
923        """assert the mock has been called with the specified calls.
924        The `mock_calls` list is checked for the calls.
925
926        If `any_order` is False (the default) then the calls must be
927        sequential. There can be extra calls before or after the
928        specified calls.
929
930        If `any_order` is True then the calls can be in any order, but
931        they must all appear in `mock_calls`."""
932        expected = [self._call_matcher(c) for c in calls]
933        cause = next((e for e in expected if isinstance(e, Exception)), None)
934        all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
935        if not any_order:
936            if expected not in all_calls:
937                if cause is None:
938                    problem = 'Calls not found.'
939                else:
940                    problem = ('Error processing expected calls.\n'
941                               'Errors: {}').format(
942                                   [e if isinstance(e, Exception) else None
943                                    for e in expected])
944                raise AssertionError(
945                    f'{problem}\n'
946                    f'Expected: {_CallList(calls)}'
947                    f'{self._calls_repr(prefix="Actual").rstrip(".")}'
948                ) from cause
949            return
950
951        all_calls = list(all_calls)
952
953        not_found = []
954        for kall in expected:
955            try:
956                all_calls.remove(kall)
957            except ValueError:
958                not_found.append(kall)
959        if not_found:
960            raise AssertionError(
961                '%r does not contain all of %r in its call list, '
962                'found %r instead' % (self._mock_name or 'mock',
963                                      tuple(not_found), all_calls)
964            ) from cause
965
966
967    def assert_any_call(self, /, *args, **kwargs):
968        """assert the mock has been called with the specified arguments.
969
970        The assert passes if the mock has *ever* been called, unlike
971        `assert_called_with` and `assert_called_once_with` that only pass if
972        the call is the most recent one."""
973        expected = self._call_matcher((args, kwargs))
974        actual = [self._call_matcher(c) for c in self.call_args_list]
975        if expected not in actual:
976            cause = expected if isinstance(expected, Exception) else None
977            expected_string = self._format_mock_call_signature(args, kwargs)
978            raise AssertionError(
979                '%s call not found' % expected_string
980            ) from cause
981
982
983    def _get_child_mock(self, /, **kw):
984        """Create the child mocks for attributes and return value.
985        By default child mocks will be the same type as the parent.
986        Subclasses of Mock may want to override this to customize the way
987        child mocks are made.
988
989        For non-callable mocks the callable variant will be used (rather than
990        any custom subclass)."""
991        _new_name = kw.get("_new_name")
992        if _new_name in self.__dict__['_spec_asyncs']:
993            return AsyncMock(**kw)
994
995        _type = type(self)
996        if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
997            # Any asynchronous magic becomes an AsyncMock
998            klass = AsyncMock
999        elif issubclass(_type, AsyncMockMixin):
1000            if (_new_name in _all_sync_magics or
1001                    self._mock_methods and _new_name in self._mock_methods):
1002                # Any synchronous method on AsyncMock becomes a MagicMock
1003                klass = MagicMock
1004            else:
1005                klass = AsyncMock
1006        elif not issubclass(_type, CallableMixin):
1007            if issubclass(_type, NonCallableMagicMock):
1008                klass = MagicMock
1009            elif issubclass(_type, NonCallableMock):
1010                klass = Mock
1011        else:
1012            klass = _type.__mro__[1]
1013
1014        if self._mock_sealed:
1015            attribute = "." + kw["name"] if "name" in kw else "()"
1016            mock_name = self._extract_mock_name() + attribute
1017            raise AttributeError(mock_name)
1018
1019        return klass(**kw)
1020
1021
1022    def _calls_repr(self, prefix="Calls"):
1023        """Renders self.mock_calls as a string.
1024
1025        Example: "\nCalls: [call(1), call(2)]."
1026
1027        If self.mock_calls is empty, an empty string is returned. The
1028        output will be truncated if very long.
1029        """
1030        if not self.mock_calls:
1031            return ""
1032        return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1033
1034
1035
1036def _try_iter(obj):
1037    if obj is None:
1038        return obj
1039    if _is_exception(obj):
1040        return obj
1041    if _callable(obj):
1042        return obj
1043    try:
1044        return iter(obj)
1045    except TypeError:
1046        # XXXX backwards compatibility
1047        # but this will blow up on first call - so maybe we should fail early?
1048        return obj
1049
1050
1051class CallableMixin(Base):
1052
1053    def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1054                 wraps=None, name=None, spec_set=None, parent=None,
1055                 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1056        self.__dict__['_mock_return_value'] = return_value
1057        _safe_super(CallableMixin, self).__init__(
1058            spec, wraps, name, spec_set, parent,
1059            _spec_state, _new_name, _new_parent, **kwargs
1060        )
1061
1062        self.side_effect = side_effect
1063
1064
1065    def _mock_check_sig(self, /, *args, **kwargs):
1066        # stub method that can be replaced with one with a specific signature
1067        pass
1068
1069
1070    def __call__(self, /, *args, **kwargs):
1071        # can't use self in-case a function / method we are mocking uses self
1072        # in the signature
1073        self._mock_check_sig(*args, **kwargs)
1074        self._increment_mock_call(*args, **kwargs)
1075        return self._mock_call(*args, **kwargs)
1076
1077
1078    def _mock_call(self, /, *args, **kwargs):
1079        return self._execute_mock_call(*args, **kwargs)
1080
1081    def _increment_mock_call(self, /, *args, **kwargs):
1082        self.called = True
1083        self.call_count += 1
1084
1085        # handle call_args
1086        # needs to be set here so assertions on call arguments pass before
1087        # execution in the case of awaited calls
1088        _call = _Call((args, kwargs), two=True)
1089        self.call_args = _call
1090        self.call_args_list.append(_call)
1091
1092        # initial stuff for method_calls:
1093        do_method_calls = self._mock_parent is not None
1094        method_call_name = self._mock_name
1095
1096        # initial stuff for mock_calls:
1097        mock_call_name = self._mock_new_name
1098        is_a_call = mock_call_name == '()'
1099        self.mock_calls.append(_Call(('', args, kwargs)))
1100
1101        # follow up the chain of mocks:
1102        _new_parent = self._mock_new_parent
1103        while _new_parent is not None:
1104
1105            # handle method_calls:
1106            if do_method_calls:
1107                _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1108                do_method_calls = _new_parent._mock_parent is not None
1109                if do_method_calls:
1110                    method_call_name = _new_parent._mock_name + '.' + method_call_name
1111
1112            # handle mock_calls:
1113            this_mock_call = _Call((mock_call_name, args, kwargs))
1114            _new_parent.mock_calls.append(this_mock_call)
1115
1116            if _new_parent._mock_new_name:
1117                if is_a_call:
1118                    dot = ''
1119                else:
1120                    dot = '.'
1121                is_a_call = _new_parent._mock_new_name == '()'
1122                mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1123
1124            # follow the parental chain:
1125            _new_parent = _new_parent._mock_new_parent
1126
1127    def _execute_mock_call(self, /, *args, **kwargs):
1128        # separate from _increment_mock_call so that awaited functions are
1129        # executed separately from their call, also AsyncMock overrides this method
1130
1131        effect = self.side_effect
1132        if effect is not None:
1133            if _is_exception(effect):
1134                raise effect
1135            elif not _callable(effect):
1136                result = next(effect)
1137                if _is_exception(result):
1138                    raise result
1139            else:
1140                result = effect(*args, **kwargs)
1141
1142            if result is not DEFAULT:
1143                return result
1144
1145        if self._mock_return_value is not DEFAULT:
1146            return self.return_value
1147
1148        if self._mock_wraps is not None:
1149            return self._mock_wraps(*args, **kwargs)
1150
1151        return self.return_value
1152
1153
1154
1155class Mock(CallableMixin, NonCallableMock):
1156    """
1157    Create a new `Mock` object. `Mock` takes several optional arguments
1158    that specify the behaviour of the Mock object:
1159
1160    * `spec`: This can be either a list of strings or an existing object (a
1161      class or instance) that acts as the specification for the mock object. If
1162      you pass in an object then a list of strings is formed by calling dir on
1163      the object (excluding unsupported magic attributes and methods). Accessing
1164      any attribute not in this list will raise an `AttributeError`.
1165
1166      If `spec` is an object (rather than a list of strings) then
1167      `mock.__class__` returns the class of the spec object. This allows mocks
1168      to pass `isinstance` tests.
1169
1170    * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1171      or get an attribute on the mock that isn't on the object passed as
1172      `spec_set` will raise an `AttributeError`.
1173
1174    * `side_effect`: A function to be called whenever the Mock is called. See
1175      the `side_effect` attribute. Useful for raising exceptions or
1176      dynamically changing return values. The function is called with the same
1177      arguments as the mock, and unless it returns `DEFAULT`, the return
1178      value of this function is used as the return value.
1179
1180      If `side_effect` is an iterable then each call to the mock will return
1181      the next value from the iterable. If any of the members of the iterable
1182      are exceptions they will be raised instead of returned.
1183
1184    * `return_value`: The value returned when the mock is called. By default
1185      this is a new Mock (created on first access). See the
1186      `return_value` attribute.
1187
1188    * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1189      calling the Mock will pass the call through to the wrapped object
1190      (returning the real result). Attribute access on the mock will return a
1191      Mock object that wraps the corresponding attribute of the wrapped object
1192      (so attempting to access an attribute that doesn't exist will raise an
1193      `AttributeError`).
1194
1195      If the mock has an explicit `return_value` set then calls are not passed
1196      to the wrapped object and the `return_value` is returned instead.
1197
1198    * `name`: If the mock has a name then it will be used in the repr of the
1199      mock. This can be useful for debugging. The name is propagated to child
1200      mocks.
1201
1202    Mocks can also be called with arbitrary keyword arguments. These will be
1203    used to set attributes on the mock after it is created.
1204    """
1205
1206
1207def _dot_lookup(thing, comp, import_path):
1208    try:
1209        return getattr(thing, comp)
1210    except AttributeError:
1211        __import__(import_path)
1212        return getattr(thing, comp)
1213
1214
1215def _importer(target):
1216    components = target.split('.')
1217    import_path = components.pop(0)
1218    thing = __import__(import_path)
1219
1220    for comp in components:
1221        import_path += ".%s" % comp
1222        thing = _dot_lookup(thing, comp, import_path)
1223    return thing
1224
1225
1226def _is_started(patcher):
1227    # XXXX horrible
1228    return hasattr(patcher, 'is_local')
1229
1230
1231class _patch(object):
1232
1233    attribute_name = None
1234    _active_patches = []
1235
1236    def __init__(
1237            self, getter, attribute, new, spec, create,
1238            spec_set, autospec, new_callable, kwargs
1239        ):
1240        if new_callable is not None:
1241            if new is not DEFAULT:
1242                raise ValueError(
1243                    "Cannot use 'new' and 'new_callable' together"
1244                )
1245            if autospec is not None:
1246                raise ValueError(
1247                    "Cannot use 'autospec' and 'new_callable' together"
1248                )
1249
1250        self.getter = getter
1251        self.attribute = attribute
1252        self.new = new
1253        self.new_callable = new_callable
1254        self.spec = spec
1255        self.create = create
1256        self.has_local = False
1257        self.spec_set = spec_set
1258        self.autospec = autospec
1259        self.kwargs = kwargs
1260        self.additional_patchers = []
1261
1262
1263    def copy(self):
1264        patcher = _patch(
1265            self.getter, self.attribute, self.new, self.spec,
1266            self.create, self.spec_set,
1267            self.autospec, self.new_callable, self.kwargs
1268        )
1269        patcher.attribute_name = self.attribute_name
1270        patcher.additional_patchers = [
1271            p.copy() for p in self.additional_patchers
1272        ]
1273        return patcher
1274
1275
1276    def __call__(self, func):
1277        if isinstance(func, type):
1278            return self.decorate_class(func)
1279        if inspect.iscoroutinefunction(func):
1280            return self.decorate_async_callable(func)
1281        return self.decorate_callable(func)
1282
1283
1284    def decorate_class(self, klass):
1285        for attr in dir(klass):
1286            if not attr.startswith(patch.TEST_PREFIX):
1287                continue
1288
1289            attr_value = getattr(klass, attr)
1290            if not hasattr(attr_value, "__call__"):
1291                continue
1292
1293            patcher = self.copy()
1294            setattr(klass, attr, patcher(attr_value))
1295        return klass
1296
1297
1298    @contextlib.contextmanager
1299    def decoration_helper(self, patched, args, keywargs):
1300        extra_args = []
1301        entered_patchers = []
1302        patching = None
1303
1304        exc_info = tuple()
1305        try:
1306            for patching in patched.patchings:
1307                arg = patching.__enter__()
1308                entered_patchers.append(patching)
1309                if patching.attribute_name is not None:
1310                    keywargs.update(arg)
1311                elif patching.new is DEFAULT:
1312                    extra_args.append(arg)
1313
1314            args += tuple(extra_args)
1315            yield (args, keywargs)
1316        except:
1317            if (patching not in entered_patchers and
1318                _is_started(patching)):
1319                # the patcher may have been started, but an exception
1320                # raised whilst entering one of its additional_patchers
1321                entered_patchers.append(patching)
1322            # Pass the exception to __exit__
1323            exc_info = sys.exc_info()
1324            # re-raise the exception
1325            raise
1326        finally:
1327            for patching in reversed(entered_patchers):
1328                patching.__exit__(*exc_info)
1329
1330
1331    def decorate_callable(self, func):
1332        # NB. Keep the method in sync with decorate_async_callable()
1333        if hasattr(func, 'patchings'):
1334            func.patchings.append(self)
1335            return func
1336
1337        @wraps(func)
1338        def patched(*args, **keywargs):
1339            with self.decoration_helper(patched,
1340                                        args,
1341                                        keywargs) as (newargs, newkeywargs):
1342                return func(*newargs, **newkeywargs)
1343
1344        patched.patchings = [self]
1345        return patched
1346
1347
1348    def decorate_async_callable(self, func):
1349        # NB. Keep the method in sync with decorate_callable()
1350        if hasattr(func, 'patchings'):
1351            func.patchings.append(self)
1352            return func
1353
1354        @wraps(func)
1355        async def patched(*args, **keywargs):
1356            with self.decoration_helper(patched,
1357                                        args,
1358                                        keywargs) as (newargs, newkeywargs):
1359                return await func(*newargs, **newkeywargs)
1360
1361        patched.patchings = [self]
1362        return patched
1363
1364
1365    def get_original(self):
1366        target = self.getter()
1367        name = self.attribute
1368
1369        original = DEFAULT
1370        local = False
1371
1372        try:
1373            original = target.__dict__[name]
1374        except (AttributeError, KeyError):
1375            original = getattr(target, name, DEFAULT)
1376        else:
1377            local = True
1378
1379        if name in _builtins and isinstance(target, ModuleType):
1380            self.create = True
1381
1382        if not self.create and original is DEFAULT:
1383            raise AttributeError(
1384                "%s does not have the attribute %r" % (target, name)
1385            )
1386        return original, local
1387
1388
1389    def __enter__(self):
1390        """Perform the patch."""
1391        new, spec, spec_set = self.new, self.spec, self.spec_set
1392        autospec, kwargs = self.autospec, self.kwargs
1393        new_callable = self.new_callable
1394        self.target = self.getter()
1395
1396        # normalise False to None
1397        if spec is False:
1398            spec = None
1399        if spec_set is False:
1400            spec_set = None
1401        if autospec is False:
1402            autospec = None
1403
1404        if spec is not None and autospec is not None:
1405            raise TypeError("Can't specify spec and autospec")
1406        if ((spec is not None or autospec is not None) and
1407            spec_set not in (True, None)):
1408            raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1409
1410        original, local = self.get_original()
1411
1412        if new is DEFAULT and autospec is None:
1413            inherit = False
1414            if spec is True:
1415                # set spec to the object we are replacing
1416                spec = original
1417                if spec_set is True:
1418                    spec_set = original
1419                    spec = None
1420            elif spec is not None:
1421                if spec_set is True:
1422                    spec_set = spec
1423                    spec = None
1424            elif spec_set is True:
1425                spec_set = original
1426
1427            if spec is not None or spec_set is not None:
1428                if original is DEFAULT:
1429                    raise TypeError("Can't use 'spec' with create=True")
1430                if isinstance(original, type):
1431                    # If we're patching out a class and there is a spec
1432                    inherit = True
1433            if spec is None and _is_async_obj(original):
1434                Klass = AsyncMock
1435            else:
1436                Klass = MagicMock
1437            _kwargs = {}
1438            if new_callable is not None:
1439                Klass = new_callable
1440            elif spec is not None or spec_set is not None:
1441                this_spec = spec
1442                if spec_set is not None:
1443                    this_spec = spec_set
1444                if _is_list(this_spec):
1445                    not_callable = '__call__' not in this_spec
1446                else:
1447                    not_callable = not callable(this_spec)
1448                if _is_async_obj(this_spec):
1449                    Klass = AsyncMock
1450                elif not_callable:
1451                    Klass = NonCallableMagicMock
1452
1453            if spec is not None:
1454                _kwargs['spec'] = spec
1455            if spec_set is not None:
1456                _kwargs['spec_set'] = spec_set
1457
1458            # add a name to mocks
1459            if (isinstance(Klass, type) and
1460                issubclass(Klass, NonCallableMock) and self.attribute):
1461                _kwargs['name'] = self.attribute
1462
1463            _kwargs.update(kwargs)
1464            new = Klass(**_kwargs)
1465
1466            if inherit and _is_instance_mock(new):
1467                # we can only tell if the instance should be callable if the
1468                # spec is not a list
1469                this_spec = spec
1470                if spec_set is not None:
1471                    this_spec = spec_set
1472                if (not _is_list(this_spec) and not
1473                    _instance_callable(this_spec)):
1474                    Klass = NonCallableMagicMock
1475
1476                _kwargs.pop('name')
1477                new.return_value = Klass(_new_parent=new, _new_name='()',
1478                                         **_kwargs)
1479        elif autospec is not None:
1480            # spec is ignored, new *must* be default, spec_set is treated
1481            # as a boolean. Should we check spec is not None and that spec_set
1482            # is a bool?
1483            if new is not DEFAULT:
1484                raise TypeError(
1485                    "autospec creates the mock for you. Can't specify "
1486                    "autospec and new."
1487                )
1488            if original is DEFAULT:
1489                raise TypeError("Can't use 'autospec' with create=True")
1490            spec_set = bool(spec_set)
1491            if autospec is True:
1492                autospec = original
1493
1494            new = create_autospec(autospec, spec_set=spec_set,
1495                                  _name=self.attribute, **kwargs)
1496        elif kwargs:
1497            # can't set keyword args when we aren't creating the mock
1498            # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1499            raise TypeError("Can't pass kwargs to a mock we aren't creating")
1500
1501        new_attr = new
1502
1503        self.temp_original = original
1504        self.is_local = local
1505        setattr(self.target, self.attribute, new_attr)
1506        if self.attribute_name is not None:
1507            extra_args = {}
1508            if self.new is DEFAULT:
1509                extra_args[self.attribute_name] =  new
1510            for patching in self.additional_patchers:
1511                arg = patching.__enter__()
1512                if patching.new is DEFAULT:
1513                    extra_args.update(arg)
1514            return extra_args
1515
1516        return new
1517
1518
1519    def __exit__(self, *exc_info):
1520        """Undo the patch."""
1521        if not _is_started(self):
1522            return
1523
1524        if self.is_local and self.temp_original is not DEFAULT:
1525            setattr(self.target, self.attribute, self.temp_original)
1526        else:
1527            delattr(self.target, self.attribute)
1528            if not self.create and (not hasattr(self.target, self.attribute) or
1529                        self.attribute in ('__doc__', '__module__',
1530                                           '__defaults__', '__annotations__',
1531                                           '__kwdefaults__')):
1532                # needed for proxy objects like django settings
1533                setattr(self.target, self.attribute, self.temp_original)
1534
1535        del self.temp_original
1536        del self.is_local
1537        del self.target
1538        for patcher in reversed(self.additional_patchers):
1539            if _is_started(patcher):
1540                patcher.__exit__(*exc_info)
1541
1542
1543    def start(self):
1544        """Activate a patch, returning any created mock."""
1545        result = self.__enter__()
1546        self._active_patches.append(self)
1547        return result
1548
1549
1550    def stop(self):
1551        """Stop an active patch."""
1552        try:
1553            self._active_patches.remove(self)
1554        except ValueError:
1555            # If the patch hasn't been started this will fail
1556            pass
1557
1558        return self.__exit__()
1559
1560
1561
1562def _get_target(target):
1563    try:
1564        target, attribute = target.rsplit('.', 1)
1565    except (TypeError, ValueError):
1566        raise TypeError("Need a valid target to patch. You supplied: %r" %
1567                        (target,))
1568    getter = lambda: _importer(target)
1569    return getter, attribute
1570
1571
1572def _patch_object(
1573        target, attribute, new=DEFAULT, spec=None,
1574        create=False, spec_set=None, autospec=None,
1575        new_callable=None, **kwargs
1576    ):
1577    """
1578    patch the named member (`attribute`) on an object (`target`) with a mock
1579    object.
1580
1581    `patch.object` can be used as a decorator, class decorator or a context
1582    manager. Arguments `new`, `spec`, `create`, `spec_set`,
1583    `autospec` and `new_callable` have the same meaning as for `patch`. Like
1584    `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1585    the mock object it creates.
1586
1587    When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1588    for choosing which methods to wrap.
1589    """
1590    if type(target) is str:
1591        raise TypeError(
1592            f"{target!r} must be the actual object to be patched, not a str"
1593        )
1594    getter = lambda: target
1595    return _patch(
1596        getter, attribute, new, spec, create,
1597        spec_set, autospec, new_callable, kwargs
1598    )
1599
1600
1601def _patch_multiple(target, spec=None, create=False, spec_set=None,
1602                    autospec=None, new_callable=None, **kwargs):
1603    """Perform multiple patches in a single call. It takes the object to be
1604    patched (either as an object or a string to fetch the object by importing)
1605    and keyword arguments for the patches::
1606
1607        with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1608            ...
1609
1610    Use `DEFAULT` as the value if you want `patch.multiple` to create
1611    mocks for you. In this case the created mocks are passed into a decorated
1612    function by keyword, and a dictionary is returned when `patch.multiple` is
1613    used as a context manager.
1614
1615    `patch.multiple` can be used as a decorator, class decorator or a context
1616    manager. The arguments `spec`, `spec_set`, `create`,
1617    `autospec` and `new_callable` have the same meaning as for `patch`. These
1618    arguments will be applied to *all* patches done by `patch.multiple`.
1619
1620    When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1621    for choosing which methods to wrap.
1622    """
1623    if type(target) is str:
1624        getter = lambda: _importer(target)
1625    else:
1626        getter = lambda: target
1627
1628    if not kwargs:
1629        raise ValueError(
1630            'Must supply at least one keyword argument with patch.multiple'
1631        )
1632    # need to wrap in a list for python 3, where items is a view
1633    items = list(kwargs.items())
1634    attribute, new = items[0]
1635    patcher = _patch(
1636        getter, attribute, new, spec, create, spec_set,
1637        autospec, new_callable, {}
1638    )
1639    patcher.attribute_name = attribute
1640    for attribute, new in items[1:]:
1641        this_patcher = _patch(
1642            getter, attribute, new, spec, create, spec_set,
1643            autospec, new_callable, {}
1644        )
1645        this_patcher.attribute_name = attribute
1646        patcher.additional_patchers.append(this_patcher)
1647    return patcher
1648
1649
1650def patch(
1651        target, new=DEFAULT, spec=None, create=False,
1652        spec_set=None, autospec=None, new_callable=None, **kwargs
1653    ):
1654    """
1655    `patch` acts as a function decorator, class decorator or a context
1656    manager. Inside the body of the function or with statement, the `target`
1657    is patched with a `new` object. When the function/with statement exits
1658    the patch is undone.
1659
1660    If `new` is omitted, then the target is replaced with an
1661    `AsyncMock if the patched object is an async function or a
1662    `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1663    omitted, the created mock is passed in as an extra argument to the
1664    decorated function. If `patch` is used as a context manager the created
1665    mock is returned by the context manager.
1666
1667    `target` should be a string in the form `'package.module.ClassName'`. The
1668    `target` is imported and the specified object replaced with the `new`
1669    object, so the `target` must be importable from the environment you are
1670    calling `patch` from. The target is imported when the decorated function
1671    is executed, not at decoration time.
1672
1673    The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1674    if patch is creating one for you.
1675
1676    In addition you can pass `spec=True` or `spec_set=True`, which causes
1677    patch to pass in the object being mocked as the spec/spec_set object.
1678
1679    `new_callable` allows you to specify a different class, or callable object,
1680    that will be called to create the `new` object. By default `AsyncMock` is
1681    used for async functions and `MagicMock` for the rest.
1682
1683    A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1684    then the mock will be created with a spec from the object being replaced.
1685    All attributes of the mock will also have the spec of the corresponding
1686    attribute of the object being replaced. Methods and functions being
1687    mocked will have their arguments checked and will raise a `TypeError` if
1688    they are called with the wrong signature. For mocks replacing a class,
1689    their return value (the 'instance') will have the same spec as the class.
1690
1691    Instead of `autospec=True` you can pass `autospec=some_object` to use an
1692    arbitrary object as the spec instead of the one being replaced.
1693
1694    By default `patch` will fail to replace attributes that don't exist. If
1695    you pass in `create=True`, and the attribute doesn't exist, patch will
1696    create the attribute for you when the patched function is called, and
1697    delete it again afterwards. This is useful for writing tests against
1698    attributes that your production code creates at runtime. It is off by
1699    default because it can be dangerous. With it switched on you can write
1700    passing tests against APIs that don't actually exist!
1701
1702    Patch can be used as a `TestCase` class decorator. It works by
1703    decorating each test method in the class. This reduces the boilerplate
1704    code when your test methods share a common patchings set. `patch` finds
1705    tests by looking for method names that start with `patch.TEST_PREFIX`.
1706    By default this is `test`, which matches the way `unittest` finds tests.
1707    You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1708
1709    Patch can be used as a context manager, with the with statement. Here the
1710    patching applies to the indented block after the with statement. If you
1711    use "as" then the patched object will be bound to the name after the
1712    "as"; very useful if `patch` is creating a mock object for you.
1713
1714    `patch` takes arbitrary keyword arguments. These will be passed to
1715    the `Mock` (or `new_callable`) on construction.
1716
1717    `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1718    available for alternate use-cases.
1719    """
1720    getter, attribute = _get_target(target)
1721    return _patch(
1722        getter, attribute, new, spec, create,
1723        spec_set, autospec, new_callable, kwargs
1724    )
1725
1726
1727class _patch_dict(object):
1728    """
1729    Patch a dictionary, or dictionary like object, and restore the dictionary
1730    to its original state after the test.
1731
1732    `in_dict` can be a dictionary or a mapping like container. If it is a
1733    mapping then it must at least support getting, setting and deleting items
1734    plus iterating over keys.
1735
1736    `in_dict` can also be a string specifying the name of the dictionary, which
1737    will then be fetched by importing it.
1738
1739    `values` can be a dictionary of values to set in the dictionary. `values`
1740    can also be an iterable of `(key, value)` pairs.
1741
1742    If `clear` is True then the dictionary will be cleared before the new
1743    values are set.
1744
1745    `patch.dict` can also be called with arbitrary keyword arguments to set
1746    values in the dictionary::
1747
1748        with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1749            ...
1750
1751    `patch.dict` can be used as a context manager, decorator or class
1752    decorator. When used as a class decorator `patch.dict` honours
1753    `patch.TEST_PREFIX` for choosing which methods to wrap.
1754    """
1755
1756    def __init__(self, in_dict, values=(), clear=False, **kwargs):
1757        self.in_dict = in_dict
1758        # support any argument supported by dict(...) constructor
1759        self.values = dict(values)
1760        self.values.update(kwargs)
1761        self.clear = clear
1762        self._original = None
1763
1764
1765    def __call__(self, f):
1766        if isinstance(f, type):
1767            return self.decorate_class(f)
1768        @wraps(f)
1769        def _inner(*args, **kw):
1770            self._patch_dict()
1771            try:
1772                return f(*args, **kw)
1773            finally:
1774                self._unpatch_dict()
1775
1776        return _inner
1777
1778
1779    def decorate_class(self, klass):
1780        for attr in dir(klass):
1781            attr_value = getattr(klass, attr)
1782            if (attr.startswith(patch.TEST_PREFIX) and
1783                 hasattr(attr_value, "__call__")):
1784                decorator = _patch_dict(self.in_dict, self.values, self.clear)
1785                decorated = decorator(attr_value)
1786                setattr(klass, attr, decorated)
1787        return klass
1788
1789
1790    def __enter__(self):
1791        """Patch the dict."""
1792        self._patch_dict()
1793        return self.in_dict
1794
1795
1796    def _patch_dict(self):
1797        values = self.values
1798        if isinstance(self.in_dict, str):
1799            self.in_dict = _importer(self.in_dict)
1800        in_dict = self.in_dict
1801        clear = self.clear
1802
1803        try:
1804            original = in_dict.copy()
1805        except AttributeError:
1806            # dict like object with no copy method
1807            # must support iteration over keys
1808            original = {}
1809            for key in in_dict:
1810                original[key] = in_dict[key]
1811        self._original = original
1812
1813        if clear:
1814            _clear_dict(in_dict)
1815
1816        try:
1817            in_dict.update(values)
1818        except AttributeError:
1819            # dict like object with no update method
1820            for key in values:
1821                in_dict[key] = values[key]
1822
1823
1824    def _unpatch_dict(self):
1825        in_dict = self.in_dict
1826        original = self._original
1827
1828        _clear_dict(in_dict)
1829
1830        try:
1831            in_dict.update(original)
1832        except AttributeError:
1833            for key in original:
1834                in_dict[key] = original[key]
1835
1836
1837    def __exit__(self, *args):
1838        """Unpatch the dict."""
1839        self._unpatch_dict()
1840        return False
1841
1842    start = __enter__
1843    stop = __exit__
1844
1845
1846def _clear_dict(in_dict):
1847    try:
1848        in_dict.clear()
1849    except AttributeError:
1850        keys = list(in_dict)
1851        for key in keys:
1852            del in_dict[key]
1853
1854
1855def _patch_stopall():
1856    """Stop all active patches. LIFO to unroll nested patches."""
1857    for patch in reversed(_patch._active_patches):
1858        patch.stop()
1859
1860
1861patch.object = _patch_object
1862patch.dict = _patch_dict
1863patch.multiple = _patch_multiple
1864patch.stopall = _patch_stopall
1865patch.TEST_PREFIX = 'test'
1866
1867magic_methods = (
1868    "lt le gt ge eq ne "
1869    "getitem setitem delitem "
1870    "len contains iter "
1871    "hash str sizeof "
1872    "enter exit "
1873    # we added divmod and rdivmod here instead of numerics
1874    # because there is no idivmod
1875    "divmod rdivmod neg pos abs invert "
1876    "complex int float index "
1877    "round trunc floor ceil "
1878    "bool next "
1879    "fspath "
1880    "aiter "
1881)
1882
1883numerics = (
1884    "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv"
1885)
1886inplace = ' '.join('i%s' % n for n in numerics.split())
1887right = ' '.join('r%s' % n for n in numerics.split())
1888
1889# not including __prepare__, __instancecheck__, __subclasscheck__
1890# (as they are metaclass methods)
1891# __del__ is not supported at all as it causes problems if it exists
1892
1893_non_defaults = {
1894    '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
1895    '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
1896    '__getstate__', '__setstate__', '__getformat__', '__setformat__',
1897    '__repr__', '__dir__', '__subclasses__', '__format__',
1898    '__getnewargs_ex__',
1899}
1900
1901
1902def _get_method(name, func):
1903    "Turns a callable object (like a mock) into a real function"
1904    def method(self, /, *args, **kw):
1905        return func(self, *args, **kw)
1906    method.__name__ = name
1907    return method
1908
1909
1910_magics = {
1911    '__%s__' % method for method in
1912    ' '.join([magic_methods, numerics, inplace, right]).split()
1913}
1914
1915# Magic methods used for async `with` statements
1916_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
1917# Magic methods that are only used with async calls but are synchronous functions themselves
1918_sync_async_magics = {"__aiter__"}
1919_async_magics = _async_method_magics | _sync_async_magics
1920
1921_all_sync_magics = _magics | _non_defaults
1922_all_magics = _all_sync_magics | _async_magics
1923
1924_unsupported_magics = {
1925    '__getattr__', '__setattr__',
1926    '__init__', '__new__', '__prepare__',
1927    '__instancecheck__', '__subclasscheck__',
1928    '__del__'
1929}
1930
1931_calculate_return_value = {
1932    '__hash__': lambda self: object.__hash__(self),
1933    '__str__': lambda self: object.__str__(self),
1934    '__sizeof__': lambda self: object.__sizeof__(self),
1935    '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
1936}
1937
1938_return_values = {
1939    '__lt__': NotImplemented,
1940    '__gt__': NotImplemented,
1941    '__le__': NotImplemented,
1942    '__ge__': NotImplemented,
1943    '__int__': 1,
1944    '__contains__': False,
1945    '__len__': 0,
1946    '__exit__': False,
1947    '__complex__': 1j,
1948    '__float__': 1.0,
1949    '__bool__': True,
1950    '__index__': 1,
1951    '__aexit__': False,
1952}
1953
1954
1955def _get_eq(self):
1956    def __eq__(other):
1957        ret_val = self.__eq__._mock_return_value
1958        if ret_val is not DEFAULT:
1959            return ret_val
1960        if self is other:
1961            return True
1962        return NotImplemented
1963    return __eq__
1964
1965def _get_ne(self):
1966    def __ne__(other):
1967        if self.__ne__._mock_return_value is not DEFAULT:
1968            return DEFAULT
1969        if self is other:
1970            return False
1971        return NotImplemented
1972    return __ne__
1973
1974def _get_iter(self):
1975    def __iter__():
1976        ret_val = self.__iter__._mock_return_value
1977        if ret_val is DEFAULT:
1978            return iter([])
1979        # if ret_val was already an iterator, then calling iter on it should
1980        # return the iterator unchanged
1981        return iter(ret_val)
1982    return __iter__
1983
1984def _get_async_iter(self):
1985    def __aiter__():
1986        ret_val = self.__aiter__._mock_return_value
1987        if ret_val is DEFAULT:
1988            return _AsyncIterator(iter([]))
1989        return _AsyncIterator(iter(ret_val))
1990    return __aiter__
1991
1992_side_effect_methods = {
1993    '__eq__': _get_eq,
1994    '__ne__': _get_ne,
1995    '__iter__': _get_iter,
1996    '__aiter__': _get_async_iter
1997}
1998
1999
2000
2001def _set_return_value(mock, method, name):
2002    fixed = _return_values.get(name, DEFAULT)
2003    if fixed is not DEFAULT:
2004        method.return_value = fixed
2005        return
2006
2007    return_calculator = _calculate_return_value.get(name)
2008    if return_calculator is not None:
2009        return_value = return_calculator(mock)
2010        method.return_value = return_value
2011        return
2012
2013    side_effector = _side_effect_methods.get(name)
2014    if side_effector is not None:
2015        method.side_effect = side_effector(mock)
2016
2017
2018
2019class MagicMixin(Base):
2020    def __init__(self, /, *args, **kw):
2021        self._mock_set_magics()  # make magic work for kwargs in init
2022        _safe_super(MagicMixin, self).__init__(*args, **kw)
2023        self._mock_set_magics()  # fix magic broken by upper level init
2024
2025
2026    def _mock_set_magics(self):
2027        orig_magics = _magics | _async_method_magics
2028        these_magics = orig_magics
2029
2030        if getattr(self, "_mock_methods", None) is not None:
2031            these_magics = orig_magics.intersection(self._mock_methods)
2032
2033            remove_magics = set()
2034            remove_magics = orig_magics - these_magics
2035
2036            for entry in remove_magics:
2037                if entry in type(self).__dict__:
2038                    # remove unneeded magic methods
2039                    delattr(self, entry)
2040
2041        # don't overwrite existing attributes if called a second time
2042        these_magics = these_magics - set(type(self).__dict__)
2043
2044        _type = type(self)
2045        for entry in these_magics:
2046            setattr(_type, entry, MagicProxy(entry, self))
2047
2048
2049
2050class NonCallableMagicMock(MagicMixin, NonCallableMock):
2051    """A version of `MagicMock` that isn't callable."""
2052    def mock_add_spec(self, spec, spec_set=False):
2053        """Add a spec to a mock. `spec` can either be an object or a
2054        list of strings. Only attributes on the `spec` can be fetched as
2055        attributes from the mock.
2056
2057        If `spec_set` is True then only attributes on the spec can be set."""
2058        self._mock_add_spec(spec, spec_set)
2059        self._mock_set_magics()
2060
2061
2062class AsyncMagicMixin(MagicMixin):
2063    def __init__(self, /, *args, **kw):
2064        self._mock_set_magics()  # make magic work for kwargs in init
2065        _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
2066        self._mock_set_magics()  # fix magic broken by upper level init
2067
2068class MagicMock(MagicMixin, Mock):
2069    """
2070    MagicMock is a subclass of Mock with default implementations
2071    of most of the magic methods. You can use MagicMock without having to
2072    configure the magic methods yourself.
2073
2074    If you use the `spec` or `spec_set` arguments then *only* magic
2075    methods that exist in the spec will be created.
2076
2077    Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2078    """
2079    def mock_add_spec(self, spec, spec_set=False):
2080        """Add a spec to a mock. `spec` can either be an object or a
2081        list of strings. Only attributes on the `spec` can be fetched as
2082        attributes from the mock.
2083
2084        If `spec_set` is True then only attributes on the spec can be set."""
2085        self._mock_add_spec(spec, spec_set)
2086        self._mock_set_magics()
2087
2088
2089
2090class MagicProxy(Base):
2091    def __init__(self, name, parent):
2092        self.name = name
2093        self.parent = parent
2094
2095    def create_mock(self):
2096        entry = self.name
2097        parent = self.parent
2098        m = parent._get_child_mock(name=entry, _new_name=entry,
2099                                   _new_parent=parent)
2100        setattr(parent, entry, m)
2101        _set_return_value(parent, m, entry)
2102        return m
2103
2104    def __get__(self, obj, _type=None):
2105        return self.create_mock()
2106
2107
2108class AsyncMockMixin(Base):
2109    await_count = _delegating_property('await_count')
2110    await_args = _delegating_property('await_args')
2111    await_args_list = _delegating_property('await_args_list')
2112
2113    def __init__(self, /, *args, **kwargs):
2114        super().__init__(*args, **kwargs)
2115        # asyncio.iscoroutinefunction() checks _is_coroutine property to say if an
2116        # object is a coroutine. Without this check it looks to see if it is a
2117        # function/method, which in this case it is not (since it is an
2118        # AsyncMock).
2119        # It is set through __dict__ because when spec_set is True, this
2120        # attribute is likely undefined.
2121        self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2122        self.__dict__['_mock_await_count'] = 0
2123        self.__dict__['_mock_await_args'] = None
2124        self.__dict__['_mock_await_args_list'] = _CallList()
2125        code_mock = NonCallableMock(spec_set=CodeType)
2126        code_mock.co_flags = inspect.CO_COROUTINE
2127        self.__dict__['__code__'] = code_mock
2128
2129    async def _execute_mock_call(self, /, *args, **kwargs):
2130        # This is nearly just like super(), except for sepcial handling
2131        # of coroutines
2132
2133        _call = self.call_args
2134        self.await_count += 1
2135        self.await_args = _call
2136        self.await_args_list.append(_call)
2137
2138        effect = self.side_effect
2139        if effect is not None:
2140            if _is_exception(effect):
2141                raise effect
2142            elif not _callable(effect):
2143                try:
2144                    result = next(effect)
2145                except StopIteration:
2146                    # It is impossible to propogate a StopIteration
2147                    # through coroutines because of PEP 479
2148                    raise StopAsyncIteration
2149                if _is_exception(result):
2150                    raise result
2151            elif asyncio.iscoroutinefunction(effect):
2152                result = await effect(*args, **kwargs)
2153            else:
2154                result = effect(*args, **kwargs)
2155
2156            if result is not DEFAULT:
2157                return result
2158
2159        if self._mock_return_value is not DEFAULT:
2160            return self.return_value
2161
2162        if self._mock_wraps is not None:
2163            if asyncio.iscoroutinefunction(self._mock_wraps):
2164                return await self._mock_wraps(*args, **kwargs)
2165            return self._mock_wraps(*args, **kwargs)
2166
2167        return self.return_value
2168
2169    def assert_awaited(self):
2170        """
2171        Assert that the mock was awaited at least once.
2172        """
2173        if self.await_count == 0:
2174            msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2175            raise AssertionError(msg)
2176
2177    def assert_awaited_once(self):
2178        """
2179        Assert that the mock was awaited exactly once.
2180        """
2181        if not self.await_count == 1:
2182            msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2183                   f" Awaited {self.await_count} times.")
2184            raise AssertionError(msg)
2185
2186    def assert_awaited_with(self, /, *args, **kwargs):
2187        """
2188        Assert that the last await was with the specified arguments.
2189        """
2190        if self.await_args is None:
2191            expected = self._format_mock_call_signature(args, kwargs)
2192            raise AssertionError(f'Expected await: {expected}\nNot awaited')
2193
2194        def _error_message():
2195            msg = self._format_mock_failure_message(args, kwargs, action='await')
2196            return msg
2197
2198        expected = self._call_matcher((args, kwargs))
2199        actual = self._call_matcher(self.await_args)
2200        if expected != actual:
2201            cause = expected if isinstance(expected, Exception) else None
2202            raise AssertionError(_error_message()) from cause
2203
2204    def assert_awaited_once_with(self, /, *args, **kwargs):
2205        """
2206        Assert that the mock was awaited exactly once and with the specified
2207        arguments.
2208        """
2209        if not self.await_count == 1:
2210            msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2211                   f" Awaited {self.await_count} times.")
2212            raise AssertionError(msg)
2213        return self.assert_awaited_with(*args, **kwargs)
2214
2215    def assert_any_await(self, /, *args, **kwargs):
2216        """
2217        Assert the mock has ever been awaited with the specified arguments.
2218        """
2219        expected = self._call_matcher((args, kwargs))
2220        actual = [self._call_matcher(c) for c in self.await_args_list]
2221        if expected not in actual:
2222            cause = expected if isinstance(expected, Exception) else None
2223            expected_string = self._format_mock_call_signature(args, kwargs)
2224            raise AssertionError(
2225                '%s await not found' % expected_string
2226            ) from cause
2227
2228    def assert_has_awaits(self, calls, any_order=False):
2229        """
2230        Assert the mock has been awaited with the specified calls.
2231        The :attr:`await_args_list` list is checked for the awaits.
2232
2233        If `any_order` is False (the default) then the awaits must be
2234        sequential. There can be extra calls before or after the
2235        specified awaits.
2236
2237        If `any_order` is True then the awaits can be in any order, but
2238        they must all appear in :attr:`await_args_list`.
2239        """
2240        expected = [self._call_matcher(c) for c in calls]
2241        cause = next((e for e in expected if isinstance(e, Exception)), None)
2242        all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2243        if not any_order:
2244            if expected not in all_awaits:
2245                if cause is None:
2246                    problem = 'Awaits not found.'
2247                else:
2248                    problem = ('Error processing expected awaits.\n'
2249                               'Errors: {}').format(
2250                                   [e if isinstance(e, Exception) else None
2251                                    for e in expected])
2252                raise AssertionError(
2253                    f'{problem}\n'
2254                    f'Expected: {_CallList(calls)}\n'
2255                    f'Actual: {self.await_args_list}'
2256                ) from cause
2257            return
2258
2259        all_awaits = list(all_awaits)
2260
2261        not_found = []
2262        for kall in expected:
2263            try:
2264                all_awaits.remove(kall)
2265            except ValueError:
2266                not_found.append(kall)
2267        if not_found:
2268            raise AssertionError(
2269                '%r not all found in await list' % (tuple(not_found),)
2270            ) from cause
2271
2272    def assert_not_awaited(self):
2273        """
2274        Assert that the mock was never awaited.
2275        """
2276        if self.await_count != 0:
2277            msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2278                   f" Awaited {self.await_count} times.")
2279            raise AssertionError(msg)
2280
2281    def reset_mock(self, /, *args, **kwargs):
2282        """
2283        See :func:`.Mock.reset_mock()`
2284        """
2285        super().reset_mock(*args, **kwargs)
2286        self.await_count = 0
2287        self.await_args = None
2288        self.await_args_list = _CallList()
2289
2290
2291class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2292    """
2293    Enhance :class:`Mock` with features allowing to mock
2294    an async function.
2295
2296    The :class:`AsyncMock` object will behave so the object is
2297    recognized as an async function, and the result of a call is an awaitable:
2298
2299    >>> mock = AsyncMock()
2300    >>> asyncio.iscoroutinefunction(mock)
2301    True
2302    >>> inspect.isawaitable(mock())
2303    True
2304
2305
2306    The result of ``mock()`` is an async function which will have the outcome
2307    of ``side_effect`` or ``return_value``:
2308
2309    - if ``side_effect`` is a function, the async function will return the
2310      result of that function,
2311    - if ``side_effect`` is an exception, the async function will raise the
2312      exception,
2313    - if ``side_effect`` is an iterable, the async function will return the
2314      next value of the iterable, however, if the sequence of result is
2315      exhausted, ``StopIteration`` is raised immediately,
2316    - if ``side_effect`` is not defined, the async function will return the
2317      value defined by ``return_value``, hence, by default, the async function
2318      returns a new :class:`AsyncMock` object.
2319
2320    If the outcome of ``side_effect`` or ``return_value`` is an async function,
2321    the mock async function obtained when the mock object is called will be this
2322    async function itself (and not an async function returning an async
2323    function).
2324
2325    The test author can also specify a wrapped object with ``wraps``. In this
2326    case, the :class:`Mock` object behavior is the same as with an
2327    :class:`.Mock` object: the wrapped object may have methods
2328    defined as async function functions.
2329
2330    Based on Martin Richard's asynctest project.
2331    """
2332
2333
2334class _ANY(object):
2335    "A helper object that compares equal to everything."
2336
2337    def __eq__(self, other):
2338        return True
2339
2340    def __ne__(self, other):
2341        return False
2342
2343    def __repr__(self):
2344        return '<ANY>'
2345
2346ANY = _ANY()
2347
2348
2349
2350def _format_call_signature(name, args, kwargs):
2351    message = '%s(%%s)' % name
2352    formatted_args = ''
2353    args_string = ', '.join([repr(arg) for arg in args])
2354    kwargs_string = ', '.join([
2355        '%s=%r' % (key, value) for key, value in kwargs.items()
2356    ])
2357    if args_string:
2358        formatted_args = args_string
2359    if kwargs_string:
2360        if formatted_args:
2361            formatted_args += ', '
2362        formatted_args += kwargs_string
2363
2364    return message % formatted_args
2365
2366
2367
2368class _Call(tuple):
2369    """
2370    A tuple for holding the results of a call to a mock, either in the form
2371    `(args, kwargs)` or `(name, args, kwargs)`.
2372
2373    If args or kwargs are empty then a call tuple will compare equal to
2374    a tuple without those values. This makes comparisons less verbose::
2375
2376        _Call(('name', (), {})) == ('name',)
2377        _Call(('name', (1,), {})) == ('name', (1,))
2378        _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2379
2380    The `_Call` object provides a useful shortcut for comparing with call::
2381
2382        _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2383        _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2384
2385    If the _Call has no name then it will match any name.
2386    """
2387    def __new__(cls, value=(), name='', parent=None, two=False,
2388                from_kall=True):
2389        args = ()
2390        kwargs = {}
2391        _len = len(value)
2392        if _len == 3:
2393            name, args, kwargs = value
2394        elif _len == 2:
2395            first, second = value
2396            if isinstance(first, str):
2397                name = first
2398                if isinstance(second, tuple):
2399                    args = second
2400                else:
2401                    kwargs = second
2402            else:
2403                args, kwargs = first, second
2404        elif _len == 1:
2405            value, = value
2406            if isinstance(value, str):
2407                name = value
2408            elif isinstance(value, tuple):
2409                args = value
2410            else:
2411                kwargs = value
2412
2413        if two:
2414            return tuple.__new__(cls, (args, kwargs))
2415
2416        return tuple.__new__(cls, (name, args, kwargs))
2417
2418
2419    def __init__(self, value=(), name=None, parent=None, two=False,
2420                 from_kall=True):
2421        self._mock_name = name
2422        self._mock_parent = parent
2423        self._mock_from_kall = from_kall
2424
2425
2426    def __eq__(self, other):
2427        if other is ANY:
2428            return True
2429        try:
2430            len_other = len(other)
2431        except TypeError:
2432            return False
2433
2434        self_name = ''
2435        if len(self) == 2:
2436            self_args, self_kwargs = self
2437        else:
2438            self_name, self_args, self_kwargs = self
2439
2440        if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2441                and self._mock_parent != other._mock_parent):
2442            return False
2443
2444        other_name = ''
2445        if len_other == 0:
2446            other_args, other_kwargs = (), {}
2447        elif len_other == 3:
2448            other_name, other_args, other_kwargs = other
2449        elif len_other == 1:
2450            value, = other
2451            if isinstance(value, tuple):
2452                other_args = value
2453                other_kwargs = {}
2454            elif isinstance(value, str):
2455                other_name = value
2456                other_args, other_kwargs = (), {}
2457            else:
2458                other_args = ()
2459                other_kwargs = value
2460        elif len_other == 2:
2461            # could be (name, args) or (name, kwargs) or (args, kwargs)
2462            first, second = other
2463            if isinstance(first, str):
2464                other_name = first
2465                if isinstance(second, tuple):
2466                    other_args, other_kwargs = second, {}
2467                else:
2468                    other_args, other_kwargs = (), second
2469            else:
2470                other_args, other_kwargs = first, second
2471        else:
2472            return False
2473
2474        if self_name and other_name != self_name:
2475            return False
2476
2477        # this order is important for ANY to work!
2478        return (other_args, other_kwargs) == (self_args, self_kwargs)
2479
2480
2481    __ne__ = object.__ne__
2482
2483
2484    def __call__(self, /, *args, **kwargs):
2485        if self._mock_name is None:
2486            return _Call(('', args, kwargs), name='()')
2487
2488        name = self._mock_name + '()'
2489        return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2490
2491
2492    def __getattr__(self, attr):
2493        if self._mock_name is None:
2494            return _Call(name=attr, from_kall=False)
2495        name = '%s.%s' % (self._mock_name, attr)
2496        return _Call(name=name, parent=self, from_kall=False)
2497
2498
2499    def __getattribute__(self, attr):
2500        if attr in tuple.__dict__:
2501            raise AttributeError
2502        return tuple.__getattribute__(self, attr)
2503
2504
2505    def count(self, /, *args, **kwargs):
2506        return self.__getattr__('count')(*args, **kwargs)
2507
2508    def index(self, /, *args, **kwargs):
2509        return self.__getattr__('index')(*args, **kwargs)
2510
2511    def _get_call_arguments(self):
2512        if len(self) == 2:
2513            args, kwargs = self
2514        else:
2515            name, args, kwargs = self
2516
2517        return args, kwargs
2518
2519    @property
2520    def args(self):
2521        return self._get_call_arguments()[0]
2522
2523    @property
2524    def kwargs(self):
2525        return self._get_call_arguments()[1]
2526
2527    def __repr__(self):
2528        if not self._mock_from_kall:
2529            name = self._mock_name or 'call'
2530            if name.startswith('()'):
2531                name = 'call%s' % name
2532            return name
2533
2534        if len(self) == 2:
2535            name = 'call'
2536            args, kwargs = self
2537        else:
2538            name, args, kwargs = self
2539            if not name:
2540                name = 'call'
2541            elif not name.startswith('()'):
2542                name = 'call.%s' % name
2543            else:
2544                name = 'call%s' % name
2545        return _format_call_signature(name, args, kwargs)
2546
2547
2548    def call_list(self):
2549        """For a call object that represents multiple calls, `call_list`
2550        returns a list of all the intermediate calls as well as the
2551        final call."""
2552        vals = []
2553        thing = self
2554        while thing is not None:
2555            if thing._mock_from_kall:
2556                vals.append(thing)
2557            thing = thing._mock_parent
2558        return _CallList(reversed(vals))
2559
2560
2561call = _Call(from_kall=False)
2562
2563
2564def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2565                    _name=None, **kwargs):
2566    """Create a mock object using another object as a spec. Attributes on the
2567    mock will use the corresponding attribute on the `spec` object as their
2568    spec.
2569
2570    Functions or methods being mocked will have their arguments checked
2571    to check that they are called with the correct signature.
2572
2573    If `spec_set` is True then attempting to set attributes that don't exist
2574    on the spec object will raise an `AttributeError`.
2575
2576    If a class is used as a spec then the return value of the mock (the
2577    instance of the class) will have the same spec. You can use a class as the
2578    spec for an instance object by passing `instance=True`. The returned mock
2579    will only be callable if instances of the mock are callable.
2580
2581    `create_autospec` also takes arbitrary keyword arguments that are passed to
2582    the constructor of the created mock."""
2583    if _is_list(spec):
2584        # can't pass a list instance to the mock constructor as it will be
2585        # interpreted as a list of strings
2586        spec = type(spec)
2587
2588    is_type = isinstance(spec, type)
2589    is_async_func = _is_async_func(spec)
2590    _kwargs = {'spec': spec}
2591    if spec_set:
2592        _kwargs = {'spec_set': spec}
2593    elif spec is None:
2594        # None we mock with a normal mock without a spec
2595        _kwargs = {}
2596    if _kwargs and instance:
2597        _kwargs['_spec_as_instance'] = True
2598
2599    _kwargs.update(kwargs)
2600
2601    Klass = MagicMock
2602    if inspect.isdatadescriptor(spec):
2603        # descriptors don't have a spec
2604        # because we don't know what type they return
2605        _kwargs = {}
2606    elif is_async_func:
2607        if instance:
2608            raise RuntimeError("Instance can not be True when create_autospec "
2609                               "is mocking an async function")
2610        Klass = AsyncMock
2611    elif not _callable(spec):
2612        Klass = NonCallableMagicMock
2613    elif is_type and instance and not _instance_callable(spec):
2614        Klass = NonCallableMagicMock
2615
2616    _name = _kwargs.pop('name', _name)
2617
2618    _new_name = _name
2619    if _parent is None:
2620        # for a top level object no _new_name should be set
2621        _new_name = ''
2622
2623    mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2624                 name=_name, **_kwargs)
2625
2626    if isinstance(spec, FunctionTypes):
2627        # should only happen at the top level because we don't
2628        # recurse for functions
2629        mock = _set_signature(mock, spec)
2630        if is_async_func:
2631            _setup_async_mock(mock)
2632    else:
2633        _check_signature(spec, mock, is_type, instance)
2634
2635    if _parent is not None and not instance:
2636        _parent._mock_children[_name] = mock
2637
2638    if is_type and not instance and 'return_value' not in kwargs:
2639        mock.return_value = create_autospec(spec, spec_set, instance=True,
2640                                            _name='()', _parent=mock)
2641
2642    for entry in dir(spec):
2643        if _is_magic(entry):
2644            # MagicMock already does the useful magic methods for us
2645            continue
2646
2647        # XXXX do we need a better way of getting attributes without
2648        # triggering code execution (?) Probably not - we need the actual
2649        # object to mock it so we would rather trigger a property than mock
2650        # the property descriptor. Likewise we want to mock out dynamically
2651        # provided attributes.
2652        # XXXX what about attributes that raise exceptions other than
2653        # AttributeError on being fetched?
2654        # we could be resilient against it, or catch and propagate the
2655        # exception when the attribute is fetched from the mock
2656        try:
2657            original = getattr(spec, entry)
2658        except AttributeError:
2659            continue
2660
2661        kwargs = {'spec': original}
2662        if spec_set:
2663            kwargs = {'spec_set': original}
2664
2665        if not isinstance(original, FunctionTypes):
2666            new = _SpecState(original, spec_set, mock, entry, instance)
2667            mock._mock_children[entry] = new
2668        else:
2669            parent = mock
2670            if isinstance(spec, FunctionTypes):
2671                parent = mock.mock
2672
2673            skipfirst = _must_skip(spec, entry, is_type)
2674            kwargs['_eat_self'] = skipfirst
2675            if asyncio.iscoroutinefunction(original):
2676                child_klass = AsyncMock
2677            else:
2678                child_klass = MagicMock
2679            new = child_klass(parent=parent, name=entry, _new_name=entry,
2680                              _new_parent=parent,
2681                              **kwargs)
2682            mock._mock_children[entry] = new
2683            _check_signature(original, new, skipfirst=skipfirst)
2684
2685        # so functions created with _set_signature become instance attributes,
2686        # *plus* their underlying mock exists in _mock_children of the parent
2687        # mock. Adding to _mock_children may be unnecessary where we are also
2688        # setting as an instance attribute?
2689        if isinstance(new, FunctionTypes):
2690            setattr(mock, entry, new)
2691
2692    return mock
2693
2694
2695def _must_skip(spec, entry, is_type):
2696    """
2697    Return whether we should skip the first argument on spec's `entry`
2698    attribute.
2699    """
2700    if not isinstance(spec, type):
2701        if entry in getattr(spec, '__dict__', {}):
2702            # instance attribute - shouldn't skip
2703            return False
2704        spec = spec.__class__
2705
2706    for klass in spec.__mro__:
2707        result = klass.__dict__.get(entry, DEFAULT)
2708        if result is DEFAULT:
2709            continue
2710        if isinstance(result, (staticmethod, classmethod)):
2711            return False
2712        elif isinstance(getattr(result, '__get__', None), MethodWrapperTypes):
2713            # Normal method => skip if looked up on type
2714            # (if looked up on instance, self is already skipped)
2715            return is_type
2716        else:
2717            return False
2718
2719    # function is a dynamically provided attribute
2720    return is_type
2721
2722
2723class _SpecState(object):
2724
2725    def __init__(self, spec, spec_set=False, parent=None,
2726                 name=None, ids=None, instance=False):
2727        self.spec = spec
2728        self.ids = ids
2729        self.spec_set = spec_set
2730        self.parent = parent
2731        self.instance = instance
2732        self.name = name
2733
2734
2735FunctionTypes = (
2736    # python function
2737    type(create_autospec),
2738    # instance method
2739    type(ANY.__eq__),
2740)
2741
2742MethodWrapperTypes = (
2743    type(ANY.__eq__.__get__),
2744)
2745
2746
2747file_spec = None
2748
2749
2750def _to_stream(read_data):
2751    if isinstance(read_data, bytes):
2752        return io.BytesIO(read_data)
2753    else:
2754        return io.StringIO(read_data)
2755
2756
2757def mock_open(mock=None, read_data=''):
2758    """
2759    A helper function to create a mock to replace the use of `open`. It works
2760    for `open` called directly or used as a context manager.
2761
2762    The `mock` argument is the mock object to configure. If `None` (the
2763    default) then a `MagicMock` will be created for you, with the API limited
2764    to methods or attributes available on standard file handles.
2765
2766    `read_data` is a string for the `read`, `readline` and `readlines` of the
2767    file handle to return.  This is an empty string by default.
2768    """
2769    _read_data = _to_stream(read_data)
2770    _state = [_read_data, None]
2771
2772    def _readlines_side_effect(*args, **kwargs):
2773        if handle.readlines.return_value is not None:
2774            return handle.readlines.return_value
2775        return _state[0].readlines(*args, **kwargs)
2776
2777    def _read_side_effect(*args, **kwargs):
2778        if handle.read.return_value is not None:
2779            return handle.read.return_value
2780        return _state[0].read(*args, **kwargs)
2781
2782    def _readline_side_effect(*args, **kwargs):
2783        yield from _iter_side_effect()
2784        while True:
2785            yield _state[0].readline(*args, **kwargs)
2786
2787    def _iter_side_effect():
2788        if handle.readline.return_value is not None:
2789            while True:
2790                yield handle.readline.return_value
2791        for line in _state[0]:
2792            yield line
2793
2794    def _next_side_effect():
2795        if handle.readline.return_value is not None:
2796            return handle.readline.return_value
2797        return next(_state[0])
2798
2799    global file_spec
2800    if file_spec is None:
2801        import _io
2802        file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2803
2804    if mock is None:
2805        mock = MagicMock(name='open', spec=open)
2806
2807    handle = MagicMock(spec=file_spec)
2808    handle.__enter__.return_value = handle
2809
2810    handle.write.return_value = None
2811    handle.read.return_value = None
2812    handle.readline.return_value = None
2813    handle.readlines.return_value = None
2814
2815    handle.read.side_effect = _read_side_effect
2816    _state[1] = _readline_side_effect()
2817    handle.readline.side_effect = _state[1]
2818    handle.readlines.side_effect = _readlines_side_effect
2819    handle.__iter__.side_effect = _iter_side_effect
2820    handle.__next__.side_effect = _next_side_effect
2821
2822    def reset_data(*args, **kwargs):
2823        _state[0] = _to_stream(read_data)
2824        if handle.readline.side_effect == _state[1]:
2825            # Only reset the side effect if the user hasn't overridden it.
2826            _state[1] = _readline_side_effect()
2827            handle.readline.side_effect = _state[1]
2828        return DEFAULT
2829
2830    mock.side_effect = reset_data
2831    mock.return_value = handle
2832    return mock
2833
2834
2835class PropertyMock(Mock):
2836    """
2837    A mock intended to be used as a property, or other descriptor, on a class.
2838    `PropertyMock` provides `__get__` and `__set__` methods so you can specify
2839    a return value when it is fetched.
2840
2841    Fetching a `PropertyMock` instance from an object calls the mock, with
2842    no args. Setting it calls the mock with the value being set.
2843    """
2844    def _get_child_mock(self, /, **kwargs):
2845        return MagicMock(**kwargs)
2846
2847    def __get__(self, obj, obj_type=None):
2848        return self()
2849    def __set__(self, obj, val):
2850        self(val)
2851
2852
2853def seal(mock):
2854    """Disable the automatic generation of child mocks.
2855
2856    Given an input Mock, seals it to ensure no further mocks will be generated
2857    when accessing an attribute that was not already defined.
2858
2859    The operation recursively seals the mock passed in, meaning that
2860    the mock itself, any mocks generated by accessing one of its attributes,
2861    and all assigned mocks without a name or spec will be sealed.
2862    """
2863    mock._mock_sealed = True
2864    for attr in dir(mock):
2865        try:
2866            m = getattr(mock, attr)
2867        except AttributeError:
2868            continue
2869        if not isinstance(m, NonCallableMock):
2870            continue
2871        if m._mock_new_parent is mock:
2872            seal(m)
2873
2874
2875class _AsyncIterator:
2876    """
2877    Wraps an iterator in an asynchronous iterator.
2878    """
2879    def __init__(self, iterator):
2880        self.iterator = iterator
2881        code_mock = NonCallableMock(spec_set=CodeType)
2882        code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
2883        self.__dict__['__code__'] = code_mock
2884
2885    def __aiter__(self):
2886        return self
2887
2888    async def __anext__(self):
2889        try:
2890            return next(self.iterator)
2891        except StopIteration:
2892            pass
2893        raise StopAsyncIteration
2894