• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
6
7__all__ = (
8    'Mock',
9    'MagicMock',
10    'patch',
11    'sentinel',
12    'DEFAULT',
13    'ANY',
14    'call',
15    'create_autospec',
16    'AsyncMock',
17    'ThreadingMock',
18    'FILTER_DIR',
19    'NonCallableMock',
20    'NonCallableMagicMock',
21    'mock_open',
22    'PropertyMock',
23    'seal',
24)
25
26
27import asyncio
28import contextlib
29import io
30import inspect
31import pprint
32import sys
33import builtins
34import pkgutil
35from asyncio import iscoroutinefunction
36import threading
37from types import CodeType, ModuleType, MethodType
38from unittest.util import safe_repr
39from functools import wraps, partial
40from threading import RLock
41
42
43class InvalidSpecError(Exception):
44    """Indicates that an invalid value was used as a mock spec."""
45
46
47_builtins = {name for name in dir(builtins) if not name.startswith('_')}
48
49FILTER_DIR = True
50
51# Workaround for issue #12370
52# Without this, the __class__ properties wouldn't be set correctly
53_safe_super = super
54
55def _is_async_obj(obj):
56    if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
57        return False
58    if hasattr(obj, '__func__'):
59        obj = getattr(obj, '__func__')
60    return iscoroutinefunction(obj) or inspect.isawaitable(obj)
61
62
63def _is_async_func(func):
64    if getattr(func, '__code__', None):
65        return iscoroutinefunction(func)
66    else:
67        return False
68
69
70def _is_instance_mock(obj):
71    # can't use isinstance on Mock objects because they override __class__
72    # The base class for all mocks is NonCallableMock
73    return issubclass(type(obj), NonCallableMock)
74
75
76def _is_exception(obj):
77    return (
78        isinstance(obj, BaseException) or
79        isinstance(obj, type) and issubclass(obj, BaseException)
80    )
81
82
83def _extract_mock(obj):
84    # Autospecced functions will return a FunctionType with "mock" attribute
85    # which is the actual mock object that needs to be used.
86    if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
87        return obj.mock
88    else:
89        return obj
90
91
92def _get_signature_object(func, as_instance, eat_self):
93    """
94    Given an arbitrary, possibly callable object, try to create a suitable
95    signature object.
96    Return a (reduced func, signature) tuple, or None.
97    """
98    if isinstance(func, type) and not as_instance:
99        # If it's a type and should be modelled as a type, use __init__.
100        func = func.__init__
101        # Skip the `self` argument in __init__
102        eat_self = True
103    elif isinstance(func, (classmethod, staticmethod)):
104        if isinstance(func, classmethod):
105            # Skip the `cls` argument of a class method
106            eat_self = True
107        # Use the original decorated method to extract the correct function signature
108        func = func.__func__
109    elif not isinstance(func, FunctionTypes):
110        # If we really want to model an instance of the passed type,
111        # __call__ should be looked up, not __init__.
112        try:
113            func = func.__call__
114        except AttributeError:
115            return None
116    if eat_self:
117        sig_func = partial(func, None)
118    else:
119        sig_func = func
120    try:
121        return func, inspect.signature(sig_func)
122    except ValueError:
123        # Certain callable types are not supported by inspect.signature()
124        return None
125
126
127def _check_signature(func, mock, skipfirst, instance=False):
128    sig = _get_signature_object(func, instance, skipfirst)
129    if sig is None:
130        return
131    func, sig = sig
132    def checksig(self, /, *args, **kwargs):
133        sig.bind(*args, **kwargs)
134    _copy_func_details(func, checksig)
135    type(mock)._mock_check_sig = checksig
136    type(mock).__signature__ = sig
137
138
139def _copy_func_details(func, funcopy):
140    # we explicitly don't copy func.__dict__ into this copy as it would
141    # expose original attributes that should be mocked
142    for attribute in (
143        '__name__', '__doc__', '__text_signature__',
144        '__module__', '__defaults__', '__kwdefaults__',
145    ):
146        try:
147            setattr(funcopy, attribute, getattr(func, attribute))
148        except AttributeError:
149            pass
150
151
152def _callable(obj):
153    if isinstance(obj, type):
154        return True
155    if isinstance(obj, (staticmethod, classmethod, MethodType)):
156        return _callable(obj.__func__)
157    if getattr(obj, '__call__', None) is not None:
158        return True
159    return False
160
161
162def _is_list(obj):
163    # checks for list or tuples
164    # XXXX badly named!
165    return type(obj) in (list, tuple)
166
167
168def _instance_callable(obj):
169    """Given an object, return True if the object is callable.
170    For classes, return True if instances would be callable."""
171    if not isinstance(obj, type):
172        # already an instance
173        return getattr(obj, '__call__', None) is not None
174
175    # *could* be broken by a class overriding __mro__ or __dict__ via
176    # a metaclass
177    for base in (obj,) + obj.__mro__:
178        if base.__dict__.get('__call__') is not None:
179            return True
180    return False
181
182
183def _set_signature(mock, original, instance=False):
184    # creates a function with signature (*args, **kwargs) that delegates to a
185    # mock. It still does signature checking by calling a lambda with the same
186    # signature as the original.
187
188    skipfirst = isinstance(original, type)
189    result = _get_signature_object(original, instance, skipfirst)
190    if result is None:
191        return mock
192    func, sig = result
193    def checksig(*args, **kwargs):
194        sig.bind(*args, **kwargs)
195    _copy_func_details(func, checksig)
196
197    name = original.__name__
198    if not name.isidentifier():
199        name = 'funcopy'
200    context = {'_checksig_': checksig, 'mock': mock}
201    src = """def %s(*args, **kwargs):
202    _checksig_(*args, **kwargs)
203    return mock(*args, **kwargs)""" % name
204    exec (src, context)
205    funcopy = context[name]
206    _setup_func(funcopy, mock, sig)
207    return funcopy
208
209def _set_async_signature(mock, original, instance=False, is_async_mock=False):
210    # creates an async function with signature (*args, **kwargs) that delegates to a
211    # mock. It still does signature checking by calling a lambda with the same
212    # signature as the original.
213
214    skipfirst = isinstance(original, type)
215    func, sig = _get_signature_object(original, instance, skipfirst)
216    def checksig(*args, **kwargs):
217        sig.bind(*args, **kwargs)
218    _copy_func_details(func, checksig)
219
220    name = original.__name__
221    context = {'_checksig_': checksig, 'mock': mock}
222    src = """async def %s(*args, **kwargs):
223    _checksig_(*args, **kwargs)
224    return await mock(*args, **kwargs)""" % name
225    exec (src, context)
226    funcopy = context[name]
227    _setup_func(funcopy, mock, sig)
228    _setup_async_mock(funcopy)
229    return funcopy
230
231
232def _setup_func(funcopy, mock, sig):
233    funcopy.mock = mock
234
235    def assert_called_with(*args, **kwargs):
236        return mock.assert_called_with(*args, **kwargs)
237    def assert_called(*args, **kwargs):
238        return mock.assert_called(*args, **kwargs)
239    def assert_not_called(*args, **kwargs):
240        return mock.assert_not_called(*args, **kwargs)
241    def assert_called_once(*args, **kwargs):
242        return mock.assert_called_once(*args, **kwargs)
243    def assert_called_once_with(*args, **kwargs):
244        return mock.assert_called_once_with(*args, **kwargs)
245    def assert_has_calls(*args, **kwargs):
246        return mock.assert_has_calls(*args, **kwargs)
247    def assert_any_call(*args, **kwargs):
248        return mock.assert_any_call(*args, **kwargs)
249    def reset_mock():
250        funcopy.method_calls = _CallList()
251        funcopy.mock_calls = _CallList()
252        mock.reset_mock()
253        ret = funcopy.return_value
254        if _is_instance_mock(ret) and not ret is mock:
255            ret.reset_mock()
256
257    funcopy.called = False
258    funcopy.call_count = 0
259    funcopy.call_args = None
260    funcopy.call_args_list = _CallList()
261    funcopy.method_calls = _CallList()
262    funcopy.mock_calls = _CallList()
263
264    funcopy.return_value = mock.return_value
265    funcopy.side_effect = mock.side_effect
266    funcopy._mock_children = mock._mock_children
267
268    funcopy.assert_called_with = assert_called_with
269    funcopy.assert_called_once_with = assert_called_once_with
270    funcopy.assert_has_calls = assert_has_calls
271    funcopy.assert_any_call = assert_any_call
272    funcopy.reset_mock = reset_mock
273    funcopy.assert_called = assert_called
274    funcopy.assert_not_called = assert_not_called
275    funcopy.assert_called_once = assert_called_once
276    funcopy.__signature__ = sig
277
278    mock._mock_delegate = funcopy
279
280
281def _setup_async_mock(mock):
282    mock._is_coroutine = asyncio.coroutines._is_coroutine
283    mock.await_count = 0
284    mock.await_args = None
285    mock.await_args_list = _CallList()
286
287    # Mock is not configured yet so the attributes are set
288    # to a function and then the corresponding mock helper function
289    # is called when the helper is accessed similar to _setup_func.
290    def wrapper(attr, /, *args, **kwargs):
291        return getattr(mock.mock, attr)(*args, **kwargs)
292
293    for attribute in ('assert_awaited',
294                      'assert_awaited_once',
295                      'assert_awaited_with',
296                      'assert_awaited_once_with',
297                      'assert_any_await',
298                      'assert_has_awaits',
299                      'assert_not_awaited'):
300
301        # setattr(mock, attribute, wrapper) causes late binding
302        # hence attribute will always be the last value in the loop
303        # Use partial(wrapper, attribute) to ensure the attribute is bound
304        # correctly.
305        setattr(mock, attribute, partial(wrapper, attribute))
306
307
308def _is_magic(name):
309    return '__%s__' % name[2:-2] == name
310
311
312class _SentinelObject(object):
313    "A unique, named, sentinel object."
314    def __init__(self, name):
315        self.name = name
316
317    def __repr__(self):
318        return 'sentinel.%s' % self.name
319
320    def __reduce__(self):
321        return 'sentinel.%s' % self.name
322
323
324class _Sentinel(object):
325    """Access attributes to return a named object, usable as a sentinel."""
326    def __init__(self):
327        self._sentinels = {}
328
329    def __getattr__(self, name):
330        if name == '__bases__':
331            # Without this help(unittest.mock) raises an exception
332            raise AttributeError
333        return self._sentinels.setdefault(name, _SentinelObject(name))
334
335    def __reduce__(self):
336        return 'sentinel'
337
338
339sentinel = _Sentinel()
340
341DEFAULT = sentinel.DEFAULT
342_missing = sentinel.MISSING
343_deleted = sentinel.DELETED
344
345
346_allowed_names = {
347    'return_value', '_mock_return_value', 'side_effect',
348    '_mock_side_effect', '_mock_parent', '_mock_new_parent',
349    '_mock_name', '_mock_new_name'
350}
351
352
353def _delegating_property(name):
354    _allowed_names.add(name)
355    _the_name = '_mock_' + name
356    def _get(self, name=name, _the_name=_the_name):
357        sig = self._mock_delegate
358        if sig is None:
359            return getattr(self, _the_name)
360        return getattr(sig, name)
361    def _set(self, value, name=name, _the_name=_the_name):
362        sig = self._mock_delegate
363        if sig is None:
364            self.__dict__[_the_name] = value
365        else:
366            setattr(sig, name, value)
367
368    return property(_get, _set)
369
370
371
372class _CallList(list):
373
374    def __contains__(self, value):
375        if not isinstance(value, list):
376            return list.__contains__(self, value)
377        len_value = len(value)
378        len_self = len(self)
379        if len_value > len_self:
380            return False
381
382        for i in range(0, len_self - len_value + 1):
383            sub_list = self[i:i+len_value]
384            if sub_list == value:
385                return True
386        return False
387
388    def __repr__(self):
389        return pprint.pformat(list(self))
390
391
392def _check_and_set_parent(parent, value, name, new_name):
393    value = _extract_mock(value)
394
395    if not _is_instance_mock(value):
396        return False
397    if ((value._mock_name or value._mock_new_name) or
398        (value._mock_parent is not None) or
399        (value._mock_new_parent is not None)):
400        return False
401
402    _parent = parent
403    while _parent is not None:
404        # setting a mock (value) as a child or return value of itself
405        # should not modify the mock
406        if _parent is value:
407            return False
408        _parent = _parent._mock_new_parent
409
410    if new_name:
411        value._mock_new_parent = parent
412        value._mock_new_name = new_name
413    if name:
414        value._mock_parent = parent
415        value._mock_name = name
416    return True
417
418# Internal class to identify if we wrapped an iterator object or not.
419class _MockIter(object):
420    def __init__(self, obj):
421        self.obj = iter(obj)
422    def __next__(self):
423        return next(self.obj)
424
425class Base(object):
426    _mock_return_value = DEFAULT
427    _mock_side_effect = None
428    def __init__(self, /, *args, **kwargs):
429        pass
430
431
432
433class NonCallableMock(Base):
434    """A non-callable version of `Mock`"""
435
436    # Store a mutex as a class attribute in order to protect concurrent access
437    # to mock attributes. Using a class attribute allows all NonCallableMock
438    # instances to share the mutex for simplicity.
439    #
440    # See https://github.com/python/cpython/issues/98624 for why this is
441    # necessary.
442    _lock = RLock()
443
444    def __new__(
445            cls, spec=None, wraps=None, name=None, spec_set=None,
446            parent=None, _spec_state=None, _new_name='', _new_parent=None,
447            _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
448        ):
449        # every instance has its own class
450        # so we can create magic methods on the
451        # class without stomping on other mocks
452        bases = (cls,)
453        if not issubclass(cls, AsyncMockMixin):
454            # Check if spec is an async object or function
455            spec_arg = spec_set or spec
456            if spec_arg is not None and _is_async_obj(spec_arg):
457                bases = (AsyncMockMixin, cls)
458        new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
459        instance = _safe_super(NonCallableMock, cls).__new__(new)
460        return instance
461
462
463    def __init__(
464            self, spec=None, wraps=None, name=None, spec_set=None,
465            parent=None, _spec_state=None, _new_name='', _new_parent=None,
466            _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
467        ):
468        if _new_parent is None:
469            _new_parent = parent
470
471        __dict__ = self.__dict__
472        __dict__['_mock_parent'] = parent
473        __dict__['_mock_name'] = name
474        __dict__['_mock_new_name'] = _new_name
475        __dict__['_mock_new_parent'] = _new_parent
476        __dict__['_mock_sealed'] = False
477
478        if spec_set is not None:
479            spec = spec_set
480            spec_set = True
481        if _eat_self is None:
482            _eat_self = parent is not None
483
484        self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
485
486        __dict__['_mock_children'] = {}
487        __dict__['_mock_wraps'] = wraps
488        __dict__['_mock_delegate'] = None
489
490        __dict__['_mock_called'] = False
491        __dict__['_mock_call_args'] = None
492        __dict__['_mock_call_count'] = 0
493        __dict__['_mock_call_args_list'] = _CallList()
494        __dict__['_mock_mock_calls'] = _CallList()
495
496        __dict__['method_calls'] = _CallList()
497        __dict__['_mock_unsafe'] = unsafe
498
499        if kwargs:
500            self.configure_mock(**kwargs)
501
502        _safe_super(NonCallableMock, self).__init__(
503            spec, wraps, name, spec_set, parent,
504            _spec_state
505        )
506
507
508    def attach_mock(self, mock, attribute):
509        """
510        Attach a mock as an attribute of this one, replacing its name and
511        parent. Calls to the attached mock will be recorded in the
512        `method_calls` and `mock_calls` attributes of this one."""
513        inner_mock = _extract_mock(mock)
514
515        inner_mock._mock_parent = None
516        inner_mock._mock_new_parent = None
517        inner_mock._mock_name = ''
518        inner_mock._mock_new_name = None
519
520        setattr(self, attribute, mock)
521
522
523    def mock_add_spec(self, spec, spec_set=False):
524        """Add a spec to a mock. `spec` can either be an object or a
525        list of strings. Only attributes on the `spec` can be fetched as
526        attributes from the mock.
527
528        If `spec_set` is True then only attributes on the spec can be set."""
529        self._mock_add_spec(spec, spec_set)
530
531
532    def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
533                       _eat_self=False):
534        if _is_instance_mock(spec):
535            raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]')
536
537        _spec_class = None
538        _spec_signature = None
539        _spec_asyncs = []
540
541        if spec is not None and not _is_list(spec):
542            if isinstance(spec, type):
543                _spec_class = spec
544            else:
545                _spec_class = type(spec)
546            res = _get_signature_object(spec,
547                                        _spec_as_instance, _eat_self)
548            _spec_signature = res and res[1]
549
550            spec_list = dir(spec)
551
552            for attr in spec_list:
553                static_attr = inspect.getattr_static(spec, attr, None)
554                unwrapped_attr = static_attr
555                try:
556                    unwrapped_attr = inspect.unwrap(unwrapped_attr)
557                except ValueError:
558                    pass
559                if iscoroutinefunction(unwrapped_attr):
560                    _spec_asyncs.append(attr)
561
562            spec = spec_list
563
564        __dict__ = self.__dict__
565        __dict__['_spec_class'] = _spec_class
566        __dict__['_spec_set'] = spec_set
567        __dict__['_spec_signature'] = _spec_signature
568        __dict__['_mock_methods'] = spec
569        __dict__['_spec_asyncs'] = _spec_asyncs
570
571    def __get_return_value(self):
572        ret = self._mock_return_value
573        if self._mock_delegate is not None:
574            ret = self._mock_delegate.return_value
575
576        if ret is DEFAULT and self._mock_wraps is None:
577            ret = self._get_child_mock(
578                _new_parent=self, _new_name='()'
579            )
580            self.return_value = ret
581        return ret
582
583
584    def __set_return_value(self, value):
585        if self._mock_delegate is not None:
586            self._mock_delegate.return_value = value
587        else:
588            self._mock_return_value = value
589            _check_and_set_parent(self, value, None, '()')
590
591    __return_value_doc = "The value to be returned when the mock is called."
592    return_value = property(__get_return_value, __set_return_value,
593                            __return_value_doc)
594
595
596    @property
597    def __class__(self):
598        if self._spec_class is None:
599            return type(self)
600        return self._spec_class
601
602    called = _delegating_property('called')
603    call_count = _delegating_property('call_count')
604    call_args = _delegating_property('call_args')
605    call_args_list = _delegating_property('call_args_list')
606    mock_calls = _delegating_property('mock_calls')
607
608
609    def __get_side_effect(self):
610        delegated = self._mock_delegate
611        if delegated is None:
612            return self._mock_side_effect
613        sf = delegated.side_effect
614        if (sf is not None and not callable(sf)
615                and not isinstance(sf, _MockIter) and not _is_exception(sf)):
616            sf = _MockIter(sf)
617            delegated.side_effect = sf
618        return sf
619
620    def __set_side_effect(self, value):
621        value = _try_iter(value)
622        delegated = self._mock_delegate
623        if delegated is None:
624            self._mock_side_effect = value
625        else:
626            delegated.side_effect = value
627
628    side_effect = property(__get_side_effect, __set_side_effect)
629
630
631    def reset_mock(self, visited=None, *, return_value=False, side_effect=False):
632        "Restore the mock object to its initial state."
633        if visited is None:
634            visited = []
635        if id(self) in visited:
636            return
637        visited.append(id(self))
638
639        self.called = False
640        self.call_args = None
641        self.call_count = 0
642        self.mock_calls = _CallList()
643        self.call_args_list = _CallList()
644        self.method_calls = _CallList()
645
646        if return_value:
647            self._mock_return_value = DEFAULT
648        if side_effect:
649            self._mock_side_effect = None
650
651        for child in self._mock_children.values():
652            if isinstance(child, _SpecState) or child is _deleted:
653                continue
654            child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
655
656        ret = self._mock_return_value
657        if _is_instance_mock(ret) and ret is not self:
658            ret.reset_mock(visited)
659
660
661    def configure_mock(self, /, **kwargs):
662        """Set attributes on the mock through keyword arguments.
663
664        Attributes plus return values and side effects can be set on child
665        mocks using standard dot notation and unpacking a dictionary in the
666        method call:
667
668        >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
669        >>> mock.configure_mock(**attrs)"""
670        for arg, val in sorted(kwargs.items(),
671                               # we sort on the number of dots so that
672                               # attributes are set before we set attributes on
673                               # attributes
674                               key=lambda entry: entry[0].count('.')):
675            args = arg.split('.')
676            final = args.pop()
677            obj = self
678            for entry in args:
679                obj = getattr(obj, entry)
680            setattr(obj, final, val)
681
682
683    def __getattr__(self, name):
684        if name in {'_mock_methods', '_mock_unsafe'}:
685            raise AttributeError(name)
686        elif self._mock_methods is not None:
687            if name not in self._mock_methods or name in _all_magics:
688                raise AttributeError("Mock object has no attribute %r" % name)
689        elif _is_magic(name):
690            raise AttributeError(name)
691        if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods):
692            if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST:
693                raise AttributeError(
694                    f"{name!r} is not a valid assertion. Use a spec "
695                    f"for the mock if {name!r} is meant to be an attribute.")
696
697        with NonCallableMock._lock:
698            result = self._mock_children.get(name)
699            if result is _deleted:
700                raise AttributeError(name)
701            elif result is None:
702                wraps = None
703                if self._mock_wraps is not None:
704                    # XXXX should we get the attribute without triggering code
705                    # execution?
706                    wraps = getattr(self._mock_wraps, name)
707
708                result = self._get_child_mock(
709                    parent=self, name=name, wraps=wraps, _new_name=name,
710                    _new_parent=self
711                )
712                self._mock_children[name]  = result
713
714            elif isinstance(result, _SpecState):
715                try:
716                    result = create_autospec(
717                        result.spec, result.spec_set, result.instance,
718                        result.parent, result.name
719                    )
720                except InvalidSpecError:
721                    target_name = self.__dict__['_mock_name'] or self
722                    raise InvalidSpecError(
723                        f'Cannot autospec attr {name!r} from target '
724                        f'{target_name!r} as it has already been mocked out. '
725                        f'[target={self!r}, attr={result.spec!r}]')
726                self._mock_children[name]  = result
727
728        return result
729
730
731    def _extract_mock_name(self):
732        _name_list = [self._mock_new_name]
733        _parent = self._mock_new_parent
734        last = self
735
736        dot = '.'
737        if _name_list == ['()']:
738            dot = ''
739
740        while _parent is not None:
741            last = _parent
742
743            _name_list.append(_parent._mock_new_name + dot)
744            dot = '.'
745            if _parent._mock_new_name == '()':
746                dot = ''
747
748            _parent = _parent._mock_new_parent
749
750        _name_list = list(reversed(_name_list))
751        _first = last._mock_name or 'mock'
752        if len(_name_list) > 1:
753            if _name_list[1] not in ('()', '().'):
754                _first += '.'
755        _name_list[0] = _first
756        return ''.join(_name_list)
757
758    def __repr__(self):
759        name = self._extract_mock_name()
760
761        name_string = ''
762        if name not in ('mock', 'mock.'):
763            name_string = ' name=%r' % name
764
765        spec_string = ''
766        if self._spec_class is not None:
767            spec_string = ' spec=%r'
768            if self._spec_set:
769                spec_string = ' spec_set=%r'
770            spec_string = spec_string % self._spec_class.__name__
771        return "<%s%s%s id='%s'>" % (
772            type(self).__name__,
773            name_string,
774            spec_string,
775            id(self)
776        )
777
778
779    def __dir__(self):
780        """Filter the output of `dir(mock)` to only useful members."""
781        if not FILTER_DIR:
782            return object.__dir__(self)
783
784        extras = self._mock_methods or []
785        from_type = dir(type(self))
786        from_dict = list(self.__dict__)
787        from_child_mocks = [
788            m_name for m_name, m_value in self._mock_children.items()
789            if m_value is not _deleted]
790
791        from_type = [e for e in from_type if not e.startswith('_')]
792        from_dict = [e for e in from_dict if not e.startswith('_') or
793                     _is_magic(e)]
794        return sorted(set(extras + from_type + from_dict + from_child_mocks))
795
796
797    def __setattr__(self, name, value):
798        if name in _allowed_names:
799            # property setters go through here
800            return object.__setattr__(self, name, value)
801        elif (self._spec_set and self._mock_methods is not None and
802            name not in self._mock_methods and
803            name not in self.__dict__):
804            raise AttributeError("Mock object has no attribute '%s'" % name)
805        elif name in _unsupported_magics:
806            msg = 'Attempting to set unsupported magic method %r.' % name
807            raise AttributeError(msg)
808        elif name in _all_magics:
809            if self._mock_methods is not None and name not in self._mock_methods:
810                raise AttributeError("Mock object has no attribute '%s'" % name)
811
812            if not _is_instance_mock(value):
813                setattr(type(self), name, _get_method(name, value))
814                original = value
815                value = lambda *args, **kw: original(self, *args, **kw)
816            else:
817                # only set _new_name and not name so that mock_calls is tracked
818                # but not method calls
819                _check_and_set_parent(self, value, None, name)
820                setattr(type(self), name, value)
821                self._mock_children[name] = value
822        elif name == '__class__':
823            self._spec_class = value
824            return
825        else:
826            if _check_and_set_parent(self, value, name, name):
827                self._mock_children[name] = value
828
829        if self._mock_sealed and not hasattr(self, name):
830            mock_name = f'{self._extract_mock_name()}.{name}'
831            raise AttributeError(f'Cannot set {mock_name}')
832
833        if isinstance(value, PropertyMock):
834            self.__dict__[name] = value
835            return
836        return object.__setattr__(self, name, value)
837
838
839    def __delattr__(self, name):
840        if name in _all_magics and name in type(self).__dict__:
841            delattr(type(self), name)
842            if name not in self.__dict__:
843                # for magic methods that are still MagicProxy objects and
844                # not set on the instance itself
845                return
846
847        obj = self._mock_children.get(name, _missing)
848        if name in self.__dict__:
849            _safe_super(NonCallableMock, self).__delattr__(name)
850        elif obj is _deleted:
851            raise AttributeError(name)
852        if obj is not _missing:
853            del self._mock_children[name]
854        self._mock_children[name] = _deleted
855
856
857    def _format_mock_call_signature(self, args, kwargs):
858        name = self._mock_name or 'mock'
859        return _format_call_signature(name, args, kwargs)
860
861
862    def _format_mock_failure_message(self, args, kwargs, action='call'):
863        message = 'expected %s not found.\nExpected: %s\n  Actual: %s'
864        expected_string = self._format_mock_call_signature(args, kwargs)
865        call_args = self.call_args
866        actual_string = self._format_mock_call_signature(*call_args)
867        return message % (action, expected_string, actual_string)
868
869
870    def _get_call_signature_from_name(self, name):
871        """
872        * If call objects are asserted against a method/function like obj.meth1
873        then there could be no name for the call object to lookup. Hence just
874        return the spec_signature of the method/function being asserted against.
875        * If the name is not empty then remove () and split by '.' to get
876        list of names to iterate through the children until a potential
877        match is found. A child mock is created only during attribute access
878        so if we get a _SpecState then no attributes of the spec were accessed
879        and can be safely exited.
880        """
881        if not name:
882            return self._spec_signature
883
884        sig = None
885        names = name.replace('()', '').split('.')
886        children = self._mock_children
887
888        for name in names:
889            child = children.get(name)
890            if child is None or isinstance(child, _SpecState):
891                break
892            else:
893                # If an autospecced object is attached using attach_mock the
894                # child would be a function with mock object as attribute from
895                # which signature has to be derived.
896                child = _extract_mock(child)
897                children = child._mock_children
898                sig = child._spec_signature
899
900        return sig
901
902
903    def _call_matcher(self, _call):
904        """
905        Given a call (or simply an (args, kwargs) tuple), return a
906        comparison key suitable for matching with other calls.
907        This is a best effort method which relies on the spec's signature,
908        if available, or falls back on the arguments themselves.
909        """
910
911        if isinstance(_call, tuple) and len(_call) > 2:
912            sig = self._get_call_signature_from_name(_call[0])
913        else:
914            sig = self._spec_signature
915
916        if sig is not None:
917            if len(_call) == 2:
918                name = ''
919                args, kwargs = _call
920            else:
921                name, args, kwargs = _call
922            try:
923                bound_call = sig.bind(*args, **kwargs)
924                return call(name, bound_call.args, bound_call.kwargs)
925            except TypeError as e:
926                return e.with_traceback(None)
927        else:
928            return _call
929
930    def assert_not_called(self):
931        """assert that the mock was never called.
932        """
933        if self.call_count != 0:
934            msg = ("Expected '%s' to not have been called. Called %s times.%s"
935                   % (self._mock_name or 'mock',
936                      self.call_count,
937                      self._calls_repr()))
938            raise AssertionError(msg)
939
940    def assert_called(self):
941        """assert that the mock was called at least once
942        """
943        if self.call_count == 0:
944            msg = ("Expected '%s' to have been called." %
945                   (self._mock_name or 'mock'))
946            raise AssertionError(msg)
947
948    def assert_called_once(self):
949        """assert that the mock was called only once.
950        """
951        if not self.call_count == 1:
952            msg = ("Expected '%s' to have been called once. Called %s times.%s"
953                   % (self._mock_name or 'mock',
954                      self.call_count,
955                      self._calls_repr()))
956            raise AssertionError(msg)
957
958    def assert_called_with(self, /, *args, **kwargs):
959        """assert that the last call was made with the specified arguments.
960
961        Raises an AssertionError if the args and keyword args passed in are
962        different to the last call to the mock."""
963        if self.call_args is None:
964            expected = self._format_mock_call_signature(args, kwargs)
965            actual = 'not called.'
966            error_message = ('expected call not found.\nExpected: %s\n  Actual: %s'
967                    % (expected, actual))
968            raise AssertionError(error_message)
969
970        def _error_message():
971            msg = self._format_mock_failure_message(args, kwargs)
972            return msg
973        expected = self._call_matcher(_Call((args, kwargs), two=True))
974        actual = self._call_matcher(self.call_args)
975        if actual != expected:
976            cause = expected if isinstance(expected, Exception) else None
977            raise AssertionError(_error_message()) from cause
978
979
980    def assert_called_once_with(self, /, *args, **kwargs):
981        """assert that the mock was called exactly once and that that call was
982        with the specified arguments."""
983        if not self.call_count == 1:
984            msg = ("Expected '%s' to be called once. Called %s times.%s"
985                   % (self._mock_name or 'mock',
986                      self.call_count,
987                      self._calls_repr()))
988            raise AssertionError(msg)
989        return self.assert_called_with(*args, **kwargs)
990
991
992    def assert_has_calls(self, calls, any_order=False):
993        """assert the mock has been called with the specified calls.
994        The `mock_calls` list is checked for the calls.
995
996        If `any_order` is False (the default) then the calls must be
997        sequential. There can be extra calls before or after the
998        specified calls.
999
1000        If `any_order` is True then the calls can be in any order, but
1001        they must all appear in `mock_calls`."""
1002        expected = [self._call_matcher(c) for c in calls]
1003        cause = next((e for e in expected if isinstance(e, Exception)), None)
1004        all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
1005        if not any_order:
1006            if expected not in all_calls:
1007                if cause is None:
1008                    problem = 'Calls not found.'
1009                else:
1010                    problem = ('Error processing expected calls.\n'
1011                               'Errors: {}').format(
1012                                   [e if isinstance(e, Exception) else None
1013                                    for e in expected])
1014                raise AssertionError(
1015                    f'{problem}\n'
1016                    f'Expected: {_CallList(calls)}\n'
1017                    f'  Actual: {safe_repr(self.mock_calls)}'
1018                ) from cause
1019            return
1020
1021        all_calls = list(all_calls)
1022
1023        not_found = []
1024        for kall in expected:
1025            try:
1026                all_calls.remove(kall)
1027            except ValueError:
1028                not_found.append(kall)
1029        if not_found:
1030            raise AssertionError(
1031                '%r does not contain all of %r in its call list, '
1032                'found %r instead' % (self._mock_name or 'mock',
1033                                      tuple(not_found), all_calls)
1034            ) from cause
1035
1036
1037    def assert_any_call(self, /, *args, **kwargs):
1038        """assert the mock has been called with the specified arguments.
1039
1040        The assert passes if the mock has *ever* been called, unlike
1041        `assert_called_with` and `assert_called_once_with` that only pass if
1042        the call is the most recent one."""
1043        expected = self._call_matcher(_Call((args, kwargs), two=True))
1044        cause = expected if isinstance(expected, Exception) else None
1045        actual = [self._call_matcher(c) for c in self.call_args_list]
1046        if cause or expected not in _AnyComparer(actual):
1047            expected_string = self._format_mock_call_signature(args, kwargs)
1048            raise AssertionError(
1049                '%s call not found' % expected_string
1050            ) from cause
1051
1052
1053    def _get_child_mock(self, /, **kw):
1054        """Create the child mocks for attributes and return value.
1055        By default child mocks will be the same type as the parent.
1056        Subclasses of Mock may want to override this to customize the way
1057        child mocks are made.
1058
1059        For non-callable mocks the callable variant will be used (rather than
1060        any custom subclass)."""
1061        if self._mock_sealed:
1062            attribute = f".{kw['name']}" if "name" in kw else "()"
1063            mock_name = self._extract_mock_name() + attribute
1064            raise AttributeError(mock_name)
1065
1066        _new_name = kw.get("_new_name")
1067        if _new_name in self.__dict__['_spec_asyncs']:
1068            return AsyncMock(**kw)
1069
1070        _type = type(self)
1071        if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1072            # Any asynchronous magic becomes an AsyncMock
1073            klass = AsyncMock
1074        elif issubclass(_type, AsyncMockMixin):
1075            if (_new_name in _all_sync_magics or
1076                    self._mock_methods and _new_name in self._mock_methods):
1077                # Any synchronous method on AsyncMock becomes a MagicMock
1078                klass = MagicMock
1079            else:
1080                klass = AsyncMock
1081        elif not issubclass(_type, CallableMixin):
1082            if issubclass(_type, NonCallableMagicMock):
1083                klass = MagicMock
1084            elif issubclass(_type, NonCallableMock):
1085                klass = Mock
1086        else:
1087            klass = _type.__mro__[1]
1088        return klass(**kw)
1089
1090
1091    def _calls_repr(self):
1092        """Renders self.mock_calls as a string.
1093
1094        Example: "\nCalls: [call(1), call(2)]."
1095
1096        If self.mock_calls is empty, an empty string is returned. The
1097        output will be truncated if very long.
1098        """
1099        if not self.mock_calls:
1100            return ""
1101        return f"\nCalls: {safe_repr(self.mock_calls)}."
1102
1103
1104# Denylist for forbidden attribute names in safe mode
1105_ATTRIB_DENY_LIST = frozenset({
1106    name.removeprefix("assert_")
1107    for name in dir(NonCallableMock)
1108    if name.startswith("assert_")
1109})
1110
1111
1112class _AnyComparer(list):
1113    """A list which checks if it contains a call which may have an
1114    argument of ANY, flipping the components of item and self from
1115    their traditional locations so that ANY is guaranteed to be on
1116    the left."""
1117    def __contains__(self, item):
1118        for _call in self:
1119            assert len(item) == len(_call)
1120            if all([
1121                expected == actual
1122                for expected, actual in zip(item, _call)
1123            ]):
1124                return True
1125        return False
1126
1127
1128def _try_iter(obj):
1129    if obj is None:
1130        return obj
1131    if _is_exception(obj):
1132        return obj
1133    if _callable(obj):
1134        return obj
1135    try:
1136        return iter(obj)
1137    except TypeError:
1138        # XXXX backwards compatibility
1139        # but this will blow up on first call - so maybe we should fail early?
1140        return obj
1141
1142
1143class CallableMixin(Base):
1144
1145    def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1146                 wraps=None, name=None, spec_set=None, parent=None,
1147                 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1148        self.__dict__['_mock_return_value'] = return_value
1149        _safe_super(CallableMixin, self).__init__(
1150            spec, wraps, name, spec_set, parent,
1151            _spec_state, _new_name, _new_parent, **kwargs
1152        )
1153
1154        self.side_effect = side_effect
1155
1156
1157    def _mock_check_sig(self, /, *args, **kwargs):
1158        # stub method that can be replaced with one with a specific signature
1159        pass
1160
1161
1162    def __call__(self, /, *args, **kwargs):
1163        # can't use self in-case a function / method we are mocking uses self
1164        # in the signature
1165        self._mock_check_sig(*args, **kwargs)
1166        self._increment_mock_call(*args, **kwargs)
1167        return self._mock_call(*args, **kwargs)
1168
1169
1170    def _mock_call(self, /, *args, **kwargs):
1171        return self._execute_mock_call(*args, **kwargs)
1172
1173    def _increment_mock_call(self, /, *args, **kwargs):
1174        self.called = True
1175        self.call_count += 1
1176
1177        # handle call_args
1178        # needs to be set here so assertions on call arguments pass before
1179        # execution in the case of awaited calls
1180        _call = _Call((args, kwargs), two=True)
1181        self.call_args = _call
1182        self.call_args_list.append(_call)
1183
1184        # initial stuff for method_calls:
1185        do_method_calls = self._mock_parent is not None
1186        method_call_name = self._mock_name
1187
1188        # initial stuff for mock_calls:
1189        mock_call_name = self._mock_new_name
1190        is_a_call = mock_call_name == '()'
1191        self.mock_calls.append(_Call(('', args, kwargs)))
1192
1193        # follow up the chain of mocks:
1194        _new_parent = self._mock_new_parent
1195        while _new_parent is not None:
1196
1197            # handle method_calls:
1198            if do_method_calls:
1199                _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1200                do_method_calls = _new_parent._mock_parent is not None
1201                if do_method_calls:
1202                    method_call_name = _new_parent._mock_name + '.' + method_call_name
1203
1204            # handle mock_calls:
1205            this_mock_call = _Call((mock_call_name, args, kwargs))
1206            _new_parent.mock_calls.append(this_mock_call)
1207
1208            if _new_parent._mock_new_name:
1209                if is_a_call:
1210                    dot = ''
1211                else:
1212                    dot = '.'
1213                is_a_call = _new_parent._mock_new_name == '()'
1214                mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1215
1216            # follow the parental chain:
1217            _new_parent = _new_parent._mock_new_parent
1218
1219    def _execute_mock_call(self, /, *args, **kwargs):
1220        # separate from _increment_mock_call so that awaited functions are
1221        # executed separately from their call, also AsyncMock overrides this method
1222
1223        effect = self.side_effect
1224        if effect is not None:
1225            if _is_exception(effect):
1226                raise effect
1227            elif not _callable(effect):
1228                result = next(effect)
1229                if _is_exception(result):
1230                    raise result
1231            else:
1232                result = effect(*args, **kwargs)
1233
1234            if result is not DEFAULT:
1235                return result
1236
1237        if self._mock_return_value is not DEFAULT:
1238            return self.return_value
1239
1240        if self._mock_delegate and self._mock_delegate.return_value is not DEFAULT:
1241            return self.return_value
1242
1243        if self._mock_wraps is not None:
1244            return self._mock_wraps(*args, **kwargs)
1245
1246        return self.return_value
1247
1248
1249
1250class Mock(CallableMixin, NonCallableMock):
1251    """
1252    Create a new `Mock` object. `Mock` takes several optional arguments
1253    that specify the behaviour of the Mock object:
1254
1255    * `spec`: This can be either a list of strings or an existing object (a
1256      class or instance) that acts as the specification for the mock object. If
1257      you pass in an object then a list of strings is formed by calling dir on
1258      the object (excluding unsupported magic attributes and methods). Accessing
1259      any attribute not in this list will raise an `AttributeError`.
1260
1261      If `spec` is an object (rather than a list of strings) then
1262      `mock.__class__` returns the class of the spec object. This allows mocks
1263      to pass `isinstance` tests.
1264
1265    * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1266      or get an attribute on the mock that isn't on the object passed as
1267      `spec_set` will raise an `AttributeError`.
1268
1269    * `side_effect`: A function to be called whenever the Mock is called. See
1270      the `side_effect` attribute. Useful for raising exceptions or
1271      dynamically changing return values. The function is called with the same
1272      arguments as the mock, and unless it returns `DEFAULT`, the return
1273      value of this function is used as the return value.
1274
1275      If `side_effect` is an iterable then each call to the mock will return
1276      the next value from the iterable. If any of the members of the iterable
1277      are exceptions they will be raised instead of returned.
1278
1279    * `return_value`: The value returned when the mock is called. By default
1280      this is a new Mock (created on first access). See the
1281      `return_value` attribute.
1282
1283    * `unsafe`: By default, accessing any attribute whose name starts with
1284      *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError.
1285      Additionally, an AttributeError is raised when accessing
1286      attributes that match the name of an assertion method without the prefix
1287      `assert_`, e.g. accessing `called_once` instead of `assert_called_once`.
1288      Passing `unsafe=True` will allow access to these attributes.
1289
1290    * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1291      calling the Mock will pass the call through to the wrapped object
1292      (returning the real result). Attribute access on the mock will return a
1293      Mock object that wraps the corresponding attribute of the wrapped object
1294      (so attempting to access an attribute that doesn't exist will raise an
1295      `AttributeError`).
1296
1297      If the mock has an explicit `return_value` set then calls are not passed
1298      to the wrapped object and the `return_value` is returned instead.
1299
1300    * `name`: If the mock has a name then it will be used in the repr of the
1301      mock. This can be useful for debugging. The name is propagated to child
1302      mocks.
1303
1304    Mocks can also be called with arbitrary keyword arguments. These will be
1305    used to set attributes on the mock after it is created.
1306    """
1307
1308
1309# _check_spec_arg_typos takes kwargs from commands like patch and checks that
1310# they don't contain common misspellings of arguments related to autospeccing.
1311def _check_spec_arg_typos(kwargs_to_check):
1312    typos = ("autospect", "auto_spec", "set_spec")
1313    for typo in typos:
1314        if typo in kwargs_to_check:
1315            raise RuntimeError(
1316                f"{typo!r} might be a typo; use unsafe=True if this is intended"
1317            )
1318
1319
1320class _patch(object):
1321
1322    attribute_name = None
1323    _active_patches = []
1324
1325    def __init__(
1326            self, getter, attribute, new, spec, create,
1327            spec_set, autospec, new_callable, kwargs, *, unsafe=False
1328        ):
1329        if new_callable is not None:
1330            if new is not DEFAULT:
1331                raise ValueError(
1332                    "Cannot use 'new' and 'new_callable' together"
1333                )
1334            if autospec is not None:
1335                raise ValueError(
1336                    "Cannot use 'autospec' and 'new_callable' together"
1337                )
1338        if not unsafe:
1339            _check_spec_arg_typos(kwargs)
1340        if _is_instance_mock(spec):
1341            raise InvalidSpecError(
1342                f'Cannot spec attr {attribute!r} as the spec '
1343                f'has already been mocked out. [spec={spec!r}]')
1344        if _is_instance_mock(spec_set):
1345            raise InvalidSpecError(
1346                f'Cannot spec attr {attribute!r} as the spec_set '
1347                f'target has already been mocked out. [spec_set={spec_set!r}]')
1348
1349        self.getter = getter
1350        self.attribute = attribute
1351        self.new = new
1352        self.new_callable = new_callable
1353        self.spec = spec
1354        self.create = create
1355        self.has_local = False
1356        self.spec_set = spec_set
1357        self.autospec = autospec
1358        self.kwargs = kwargs
1359        self.additional_patchers = []
1360        self.is_started = False
1361
1362
1363    def copy(self):
1364        patcher = _patch(
1365            self.getter, self.attribute, self.new, self.spec,
1366            self.create, self.spec_set,
1367            self.autospec, self.new_callable, self.kwargs
1368        )
1369        patcher.attribute_name = self.attribute_name
1370        patcher.additional_patchers = [
1371            p.copy() for p in self.additional_patchers
1372        ]
1373        return patcher
1374
1375
1376    def __call__(self, func):
1377        if isinstance(func, type):
1378            return self.decorate_class(func)
1379        if inspect.iscoroutinefunction(func):
1380            return self.decorate_async_callable(func)
1381        return self.decorate_callable(func)
1382
1383
1384    def decorate_class(self, klass):
1385        for attr in dir(klass):
1386            if not attr.startswith(patch.TEST_PREFIX):
1387                continue
1388
1389            attr_value = getattr(klass, attr)
1390            if not hasattr(attr_value, "__call__"):
1391                continue
1392
1393            patcher = self.copy()
1394            setattr(klass, attr, patcher(attr_value))
1395        return klass
1396
1397
1398    @contextlib.contextmanager
1399    def decoration_helper(self, patched, args, keywargs):
1400        extra_args = []
1401        with contextlib.ExitStack() as exit_stack:
1402            for patching in patched.patchings:
1403                arg = exit_stack.enter_context(patching)
1404                if patching.attribute_name is not None:
1405                    keywargs.update(arg)
1406                elif patching.new is DEFAULT:
1407                    extra_args.append(arg)
1408
1409            args += tuple(extra_args)
1410            yield (args, keywargs)
1411
1412
1413    def decorate_callable(self, func):
1414        # NB. Keep the method in sync with decorate_async_callable()
1415        if hasattr(func, 'patchings'):
1416            func.patchings.append(self)
1417            return func
1418
1419        @wraps(func)
1420        def patched(*args, **keywargs):
1421            with self.decoration_helper(patched,
1422                                        args,
1423                                        keywargs) as (newargs, newkeywargs):
1424                return func(*newargs, **newkeywargs)
1425
1426        patched.patchings = [self]
1427        return patched
1428
1429
1430    def decorate_async_callable(self, func):
1431        # NB. Keep the method in sync with decorate_callable()
1432        if hasattr(func, 'patchings'):
1433            func.patchings.append(self)
1434            return func
1435
1436        @wraps(func)
1437        async def patched(*args, **keywargs):
1438            with self.decoration_helper(patched,
1439                                        args,
1440                                        keywargs) as (newargs, newkeywargs):
1441                return await func(*newargs, **newkeywargs)
1442
1443        patched.patchings = [self]
1444        return patched
1445
1446
1447    def get_original(self):
1448        target = self.getter()
1449        name = self.attribute
1450
1451        original = DEFAULT
1452        local = False
1453
1454        try:
1455            original = target.__dict__[name]
1456        except (AttributeError, KeyError):
1457            original = getattr(target, name, DEFAULT)
1458        else:
1459            local = True
1460
1461        if name in _builtins and isinstance(target, ModuleType):
1462            self.create = True
1463
1464        if not self.create and original is DEFAULT:
1465            raise AttributeError(
1466                "%s does not have the attribute %r" % (target, name)
1467            )
1468        return original, local
1469
1470
1471    def __enter__(self):
1472        """Perform the patch."""
1473        if self.is_started:
1474            raise RuntimeError("Patch is already started")
1475
1476        new, spec, spec_set = self.new, self.spec, self.spec_set
1477        autospec, kwargs = self.autospec, self.kwargs
1478        new_callable = self.new_callable
1479        self.target = self.getter()
1480
1481        # normalise False to None
1482        if spec is False:
1483            spec = None
1484        if spec_set is False:
1485            spec_set = None
1486        if autospec is False:
1487            autospec = None
1488
1489        if spec is not None and autospec is not None:
1490            raise TypeError("Can't specify spec and autospec")
1491        if ((spec is not None or autospec is not None) and
1492            spec_set not in (True, None)):
1493            raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1494
1495        original, local = self.get_original()
1496
1497        if new is DEFAULT and autospec is None:
1498            inherit = False
1499            if spec is True:
1500                # set spec to the object we are replacing
1501                spec = original
1502                if spec_set is True:
1503                    spec_set = original
1504                    spec = None
1505            elif spec is not None:
1506                if spec_set is True:
1507                    spec_set = spec
1508                    spec = None
1509            elif spec_set is True:
1510                spec_set = original
1511
1512            if spec is not None or spec_set is not None:
1513                if original is DEFAULT:
1514                    raise TypeError("Can't use 'spec' with create=True")
1515                if isinstance(original, type):
1516                    # If we're patching out a class and there is a spec
1517                    inherit = True
1518
1519            # Determine the Klass to use
1520            if new_callable is not None:
1521                Klass = new_callable
1522            elif spec is None and _is_async_obj(original):
1523                Klass = AsyncMock
1524            elif spec is not None or spec_set is not None:
1525                this_spec = spec
1526                if spec_set is not None:
1527                    this_spec = spec_set
1528                if _is_list(this_spec):
1529                    not_callable = '__call__' not in this_spec
1530                else:
1531                    not_callable = not callable(this_spec)
1532                if _is_async_obj(this_spec):
1533                    Klass = AsyncMock
1534                elif not_callable:
1535                    Klass = NonCallableMagicMock
1536                else:
1537                    Klass = MagicMock
1538            else:
1539                Klass = MagicMock
1540
1541            _kwargs = {}
1542            if spec is not None:
1543                _kwargs['spec'] = spec
1544            if spec_set is not None:
1545                _kwargs['spec_set'] = spec_set
1546
1547            # add a name to mocks
1548            if (isinstance(Klass, type) and
1549                issubclass(Klass, NonCallableMock) and self.attribute):
1550                _kwargs['name'] = self.attribute
1551
1552            _kwargs.update(kwargs)
1553            new = Klass(**_kwargs)
1554
1555            if inherit and _is_instance_mock(new):
1556                # we can only tell if the instance should be callable if the
1557                # spec is not a list
1558                this_spec = spec
1559                if spec_set is not None:
1560                    this_spec = spec_set
1561                if (not _is_list(this_spec) and not
1562                    _instance_callable(this_spec)):
1563                    Klass = NonCallableMagicMock
1564
1565                _kwargs.pop('name')
1566                new.return_value = Klass(_new_parent=new, _new_name='()',
1567                                         **_kwargs)
1568        elif autospec is not None:
1569            # spec is ignored, new *must* be default, spec_set is treated
1570            # as a boolean. Should we check spec is not None and that spec_set
1571            # is a bool?
1572            if new is not DEFAULT:
1573                raise TypeError(
1574                    "autospec creates the mock for you. Can't specify "
1575                    "autospec and new."
1576                )
1577            if original is DEFAULT:
1578                raise TypeError("Can't use 'autospec' with create=True")
1579            spec_set = bool(spec_set)
1580            if autospec is True:
1581                autospec = original
1582
1583            if _is_instance_mock(self.target):
1584                raise InvalidSpecError(
1585                    f'Cannot autospec attr {self.attribute!r} as the patch '
1586                    f'target has already been mocked out. '
1587                    f'[target={self.target!r}, attr={autospec!r}]')
1588            if _is_instance_mock(autospec):
1589                target_name = getattr(self.target, '__name__', self.target)
1590                raise InvalidSpecError(
1591                    f'Cannot autospec attr {self.attribute!r} from target '
1592                    f'{target_name!r} as it has already been mocked out. '
1593                    f'[target={self.target!r}, attr={autospec!r}]')
1594
1595            new = create_autospec(autospec, spec_set=spec_set,
1596                                  _name=self.attribute, **kwargs)
1597        elif kwargs:
1598            # can't set keyword args when we aren't creating the mock
1599            # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1600            raise TypeError("Can't pass kwargs to a mock we aren't creating")
1601
1602        new_attr = new
1603
1604        self.temp_original = original
1605        self.is_local = local
1606        self._exit_stack = contextlib.ExitStack()
1607        self.is_started = True
1608        try:
1609            setattr(self.target, self.attribute, new_attr)
1610            if self.attribute_name is not None:
1611                extra_args = {}
1612                if self.new is DEFAULT:
1613                    extra_args[self.attribute_name] =  new
1614                for patching in self.additional_patchers:
1615                    arg = self._exit_stack.enter_context(patching)
1616                    if patching.new is DEFAULT:
1617                        extra_args.update(arg)
1618                return extra_args
1619
1620            return new
1621        except:
1622            if not self.__exit__(*sys.exc_info()):
1623                raise
1624
1625    def __exit__(self, *exc_info):
1626        """Undo the patch."""
1627        if not self.is_started:
1628            return
1629
1630        if self.is_local and self.temp_original is not DEFAULT:
1631            setattr(self.target, self.attribute, self.temp_original)
1632        else:
1633            delattr(self.target, self.attribute)
1634            if not self.create and (not hasattr(self.target, self.attribute) or
1635                        self.attribute in ('__doc__', '__module__',
1636                                           '__defaults__', '__annotations__',
1637                                           '__kwdefaults__')):
1638                # needed for proxy objects like django settings
1639                setattr(self.target, self.attribute, self.temp_original)
1640
1641        del self.temp_original
1642        del self.is_local
1643        del self.target
1644        exit_stack = self._exit_stack
1645        del self._exit_stack
1646        self.is_started = False
1647        return exit_stack.__exit__(*exc_info)
1648
1649
1650    def start(self):
1651        """Activate a patch, returning any created mock."""
1652        result = self.__enter__()
1653        self._active_patches.append(self)
1654        return result
1655
1656
1657    def stop(self):
1658        """Stop an active patch."""
1659        try:
1660            self._active_patches.remove(self)
1661        except ValueError:
1662            # If the patch hasn't been started this will fail
1663            return None
1664
1665        return self.__exit__(None, None, None)
1666
1667
1668
1669def _get_target(target):
1670    try:
1671        target, attribute = target.rsplit('.', 1)
1672    except (TypeError, ValueError, AttributeError):
1673        raise TypeError(
1674            f"Need a valid target to patch. You supplied: {target!r}")
1675    return partial(pkgutil.resolve_name, target), attribute
1676
1677
1678def _patch_object(
1679        target, attribute, new=DEFAULT, spec=None,
1680        create=False, spec_set=None, autospec=None,
1681        new_callable=None, *, unsafe=False, **kwargs
1682    ):
1683    """
1684    patch the named member (`attribute`) on an object (`target`) with a mock
1685    object.
1686
1687    `patch.object` can be used as a decorator, class decorator or a context
1688    manager. Arguments `new`, `spec`, `create`, `spec_set`,
1689    `autospec` and `new_callable` have the same meaning as for `patch`. Like
1690    `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1691    the mock object it creates.
1692
1693    When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1694    for choosing which methods to wrap.
1695    """
1696    if type(target) is str:
1697        raise TypeError(
1698            f"{target!r} must be the actual object to be patched, not a str"
1699        )
1700    getter = lambda: target
1701    return _patch(
1702        getter, attribute, new, spec, create,
1703        spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1704    )
1705
1706
1707def _patch_multiple(target, spec=None, create=False, spec_set=None,
1708                    autospec=None, new_callable=None, **kwargs):
1709    """Perform multiple patches in a single call. It takes the object to be
1710    patched (either as an object or a string to fetch the object by importing)
1711    and keyword arguments for the patches::
1712
1713        with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1714            ...
1715
1716    Use `DEFAULT` as the value if you want `patch.multiple` to create
1717    mocks for you. In this case the created mocks are passed into a decorated
1718    function by keyword, and a dictionary is returned when `patch.multiple` is
1719    used as a context manager.
1720
1721    `patch.multiple` can be used as a decorator, class decorator or a context
1722    manager. The arguments `spec`, `spec_set`, `create`,
1723    `autospec` and `new_callable` have the same meaning as for `patch`. These
1724    arguments will be applied to *all* patches done by `patch.multiple`.
1725
1726    When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1727    for choosing which methods to wrap.
1728    """
1729    if type(target) is str:
1730        getter = partial(pkgutil.resolve_name, target)
1731    else:
1732        getter = lambda: target
1733
1734    if not kwargs:
1735        raise ValueError(
1736            'Must supply at least one keyword argument with patch.multiple'
1737        )
1738    # need to wrap in a list for python 3, where items is a view
1739    items = list(kwargs.items())
1740    attribute, new = items[0]
1741    patcher = _patch(
1742        getter, attribute, new, spec, create, spec_set,
1743        autospec, new_callable, {}
1744    )
1745    patcher.attribute_name = attribute
1746    for attribute, new in items[1:]:
1747        this_patcher = _patch(
1748            getter, attribute, new, spec, create, spec_set,
1749            autospec, new_callable, {}
1750        )
1751        this_patcher.attribute_name = attribute
1752        patcher.additional_patchers.append(this_patcher)
1753    return patcher
1754
1755
1756def patch(
1757        target, new=DEFAULT, spec=None, create=False,
1758        spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1759    ):
1760    """
1761    `patch` acts as a function decorator, class decorator or a context
1762    manager. Inside the body of the function or with statement, the `target`
1763    is patched with a `new` object. When the function/with statement exits
1764    the patch is undone.
1765
1766    If `new` is omitted, then the target is replaced with an
1767    `AsyncMock if the patched object is an async function or a
1768    `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1769    omitted, the created mock is passed in as an extra argument to the
1770    decorated function. If `patch` is used as a context manager the created
1771    mock is returned by the context manager.
1772
1773    `target` should be a string in the form `'package.module.ClassName'`. The
1774    `target` is imported and the specified object replaced with the `new`
1775    object, so the `target` must be importable from the environment you are
1776    calling `patch` from. The target is imported when the decorated function
1777    is executed, not at decoration time.
1778
1779    The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1780    if patch is creating one for you.
1781
1782    In addition you can pass `spec=True` or `spec_set=True`, which causes
1783    patch to pass in the object being mocked as the spec/spec_set object.
1784
1785    `new_callable` allows you to specify a different class, or callable object,
1786    that will be called to create the `new` object. By default `AsyncMock` is
1787    used for async functions and `MagicMock` for the rest.
1788
1789    A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1790    then the mock will be created with a spec from the object being replaced.
1791    All attributes of the mock will also have the spec of the corresponding
1792    attribute of the object being replaced. Methods and functions being
1793    mocked will have their arguments checked and will raise a `TypeError` if
1794    they are called with the wrong signature. For mocks replacing a class,
1795    their return value (the 'instance') will have the same spec as the class.
1796
1797    Instead of `autospec=True` you can pass `autospec=some_object` to use an
1798    arbitrary object as the spec instead of the one being replaced.
1799
1800    By default `patch` will fail to replace attributes that don't exist. If
1801    you pass in `create=True`, and the attribute doesn't exist, patch will
1802    create the attribute for you when the patched function is called, and
1803    delete it again afterwards. This is useful for writing tests against
1804    attributes that your production code creates at runtime. It is off by
1805    default because it can be dangerous. With it switched on you can write
1806    passing tests against APIs that don't actually exist!
1807
1808    Patch can be used as a `TestCase` class decorator. It works by
1809    decorating each test method in the class. This reduces the boilerplate
1810    code when your test methods share a common patchings set. `patch` finds
1811    tests by looking for method names that start with `patch.TEST_PREFIX`.
1812    By default this is `test`, which matches the way `unittest` finds tests.
1813    You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1814
1815    Patch can be used as a context manager, with the with statement. Here the
1816    patching applies to the indented block after the with statement. If you
1817    use "as" then the patched object will be bound to the name after the
1818    "as"; very useful if `patch` is creating a mock object for you.
1819
1820    Patch will raise a `RuntimeError` if passed some common misspellings of
1821    the arguments autospec and spec_set. Pass the argument `unsafe` with the
1822    value True to disable that check.
1823
1824    `patch` takes arbitrary keyword arguments. These will be passed to
1825    `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1826    otherwise or to `new_callable` if specified.
1827
1828    `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1829    available for alternate use-cases.
1830    """
1831    getter, attribute = _get_target(target)
1832    return _patch(
1833        getter, attribute, new, spec, create,
1834        spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1835    )
1836
1837
1838class _patch_dict(object):
1839    """
1840    Patch a dictionary, or dictionary like object, and restore the dictionary
1841    to its original state after the test.
1842
1843    `in_dict` can be a dictionary or a mapping like container. If it is a
1844    mapping then it must at least support getting, setting and deleting items
1845    plus iterating over keys.
1846
1847    `in_dict` can also be a string specifying the name of the dictionary, which
1848    will then be fetched by importing it.
1849
1850    `values` can be a dictionary of values to set in the dictionary. `values`
1851    can also be an iterable of `(key, value)` pairs.
1852
1853    If `clear` is True then the dictionary will be cleared before the new
1854    values are set.
1855
1856    `patch.dict` can also be called with arbitrary keyword arguments to set
1857    values in the dictionary::
1858
1859        with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1860            ...
1861
1862    `patch.dict` can be used as a context manager, decorator or class
1863    decorator. When used as a class decorator `patch.dict` honours
1864    `patch.TEST_PREFIX` for choosing which methods to wrap.
1865    """
1866
1867    def __init__(self, in_dict, values=(), clear=False, **kwargs):
1868        self.in_dict = in_dict
1869        # support any argument supported by dict(...) constructor
1870        self.values = dict(values)
1871        self.values.update(kwargs)
1872        self.clear = clear
1873        self._original = None
1874
1875
1876    def __call__(self, f):
1877        if isinstance(f, type):
1878            return self.decorate_class(f)
1879        if inspect.iscoroutinefunction(f):
1880            return self.decorate_async_callable(f)
1881        return self.decorate_callable(f)
1882
1883
1884    def decorate_callable(self, f):
1885        @wraps(f)
1886        def _inner(*args, **kw):
1887            self._patch_dict()
1888            try:
1889                return f(*args, **kw)
1890            finally:
1891                self._unpatch_dict()
1892
1893        return _inner
1894
1895
1896    def decorate_async_callable(self, f):
1897        @wraps(f)
1898        async def _inner(*args, **kw):
1899            self._patch_dict()
1900            try:
1901                return await f(*args, **kw)
1902            finally:
1903                self._unpatch_dict()
1904
1905        return _inner
1906
1907
1908    def decorate_class(self, klass):
1909        for attr in dir(klass):
1910            attr_value = getattr(klass, attr)
1911            if (attr.startswith(patch.TEST_PREFIX) and
1912                 hasattr(attr_value, "__call__")):
1913                decorator = _patch_dict(self.in_dict, self.values, self.clear)
1914                decorated = decorator(attr_value)
1915                setattr(klass, attr, decorated)
1916        return klass
1917
1918
1919    def __enter__(self):
1920        """Patch the dict."""
1921        self._patch_dict()
1922        return self.in_dict
1923
1924
1925    def _patch_dict(self):
1926        values = self.values
1927        if isinstance(self.in_dict, str):
1928            self.in_dict = pkgutil.resolve_name(self.in_dict)
1929        in_dict = self.in_dict
1930        clear = self.clear
1931
1932        try:
1933            original = in_dict.copy()
1934        except AttributeError:
1935            # dict like object with no copy method
1936            # must support iteration over keys
1937            original = {}
1938            for key in in_dict:
1939                original[key] = in_dict[key]
1940        self._original = original
1941
1942        if clear:
1943            _clear_dict(in_dict)
1944
1945        try:
1946            in_dict.update(values)
1947        except AttributeError:
1948            # dict like object with no update method
1949            for key in values:
1950                in_dict[key] = values[key]
1951
1952
1953    def _unpatch_dict(self):
1954        in_dict = self.in_dict
1955        original = self._original
1956
1957        _clear_dict(in_dict)
1958
1959        try:
1960            in_dict.update(original)
1961        except AttributeError:
1962            for key in original:
1963                in_dict[key] = original[key]
1964
1965
1966    def __exit__(self, *args):
1967        """Unpatch the dict."""
1968        if self._original is not None:
1969            self._unpatch_dict()
1970        return False
1971
1972
1973    def start(self):
1974        """Activate a patch, returning any created mock."""
1975        result = self.__enter__()
1976        _patch._active_patches.append(self)
1977        return result
1978
1979
1980    def stop(self):
1981        """Stop an active patch."""
1982        try:
1983            _patch._active_patches.remove(self)
1984        except ValueError:
1985            # If the patch hasn't been started this will fail
1986            return None
1987
1988        return self.__exit__(None, None, None)
1989
1990
1991def _clear_dict(in_dict):
1992    try:
1993        in_dict.clear()
1994    except AttributeError:
1995        keys = list(in_dict)
1996        for key in keys:
1997            del in_dict[key]
1998
1999
2000def _patch_stopall():
2001    """Stop all active patches. LIFO to unroll nested patches."""
2002    for patch in reversed(_patch._active_patches):
2003        patch.stop()
2004
2005
2006patch.object = _patch_object
2007patch.dict = _patch_dict
2008patch.multiple = _patch_multiple
2009patch.stopall = _patch_stopall
2010patch.TEST_PREFIX = 'test'
2011
2012magic_methods = (
2013    "lt le gt ge eq ne "
2014    "getitem setitem delitem "
2015    "len contains iter "
2016    "hash str sizeof "
2017    "enter exit "
2018    # we added divmod and rdivmod here instead of numerics
2019    # because there is no idivmod
2020    "divmod rdivmod neg pos abs invert "
2021    "complex int float index "
2022    "round trunc floor ceil "
2023    "bool next "
2024    "fspath "
2025    "aiter "
2026)
2027
2028numerics = (
2029    "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow"
2030)
2031inplace = ' '.join('i%s' % n for n in numerics.split())
2032right = ' '.join('r%s' % n for n in numerics.split())
2033
2034# not including __prepare__, __instancecheck__, __subclasscheck__
2035# (as they are metaclass methods)
2036# __del__ is not supported at all as it causes problems if it exists
2037
2038_non_defaults = {
2039    '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
2040    '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
2041    '__getstate__', '__setstate__', '__getformat__',
2042    '__repr__', '__dir__', '__subclasses__', '__format__',
2043    '__getnewargs_ex__',
2044}
2045
2046
2047def _get_method(name, func):
2048    "Turns a callable object (like a mock) into a real function"
2049    def method(self, /, *args, **kw):
2050        return func(self, *args, **kw)
2051    method.__name__ = name
2052    return method
2053
2054
2055_magics = {
2056    '__%s__' % method for method in
2057    ' '.join([magic_methods, numerics, inplace, right]).split()
2058}
2059
2060# Magic methods used for async `with` statements
2061_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
2062# Magic methods that are only used with async calls but are synchronous functions themselves
2063_sync_async_magics = {"__aiter__"}
2064_async_magics = _async_method_magics | _sync_async_magics
2065
2066_all_sync_magics = _magics | _non_defaults
2067_all_magics = _all_sync_magics | _async_magics
2068
2069_unsupported_magics = {
2070    '__getattr__', '__setattr__',
2071    '__init__', '__new__', '__prepare__',
2072    '__instancecheck__', '__subclasscheck__',
2073    '__del__'
2074}
2075
2076_calculate_return_value = {
2077    '__hash__': lambda self: object.__hash__(self),
2078    '__str__': lambda self: object.__str__(self),
2079    '__sizeof__': lambda self: object.__sizeof__(self),
2080    '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
2081}
2082
2083_return_values = {
2084    '__lt__': NotImplemented,
2085    '__gt__': NotImplemented,
2086    '__le__': NotImplemented,
2087    '__ge__': NotImplemented,
2088    '__int__': 1,
2089    '__contains__': False,
2090    '__len__': 0,
2091    '__exit__': False,
2092    '__complex__': 1j,
2093    '__float__': 1.0,
2094    '__bool__': True,
2095    '__index__': 1,
2096    '__aexit__': False,
2097}
2098
2099
2100def _get_eq(self):
2101    def __eq__(other):
2102        ret_val = self.__eq__._mock_return_value
2103        if ret_val is not DEFAULT:
2104            return ret_val
2105        if self is other:
2106            return True
2107        return NotImplemented
2108    return __eq__
2109
2110def _get_ne(self):
2111    def __ne__(other):
2112        if self.__ne__._mock_return_value is not DEFAULT:
2113            return DEFAULT
2114        if self is other:
2115            return False
2116        return NotImplemented
2117    return __ne__
2118
2119def _get_iter(self):
2120    def __iter__():
2121        ret_val = self.__iter__._mock_return_value
2122        if ret_val is DEFAULT:
2123            return iter([])
2124        # if ret_val was already an iterator, then calling iter on it should
2125        # return the iterator unchanged
2126        return iter(ret_val)
2127    return __iter__
2128
2129def _get_async_iter(self):
2130    def __aiter__():
2131        ret_val = self.__aiter__._mock_return_value
2132        if ret_val is DEFAULT:
2133            return _AsyncIterator(iter([]))
2134        return _AsyncIterator(iter(ret_val))
2135    return __aiter__
2136
2137_side_effect_methods = {
2138    '__eq__': _get_eq,
2139    '__ne__': _get_ne,
2140    '__iter__': _get_iter,
2141    '__aiter__': _get_async_iter
2142}
2143
2144
2145
2146def _set_return_value(mock, method, name):
2147    fixed = _return_values.get(name, DEFAULT)
2148    if fixed is not DEFAULT:
2149        method.return_value = fixed
2150        return
2151
2152    return_calculator = _calculate_return_value.get(name)
2153    if return_calculator is not None:
2154        return_value = return_calculator(mock)
2155        method.return_value = return_value
2156        return
2157
2158    side_effector = _side_effect_methods.get(name)
2159    if side_effector is not None:
2160        method.side_effect = side_effector(mock)
2161
2162
2163
2164class MagicMixin(Base):
2165    def __init__(self, /, *args, **kw):
2166        self._mock_set_magics()  # make magic work for kwargs in init
2167        _safe_super(MagicMixin, self).__init__(*args, **kw)
2168        self._mock_set_magics()  # fix magic broken by upper level init
2169
2170
2171    def _mock_set_magics(self):
2172        orig_magics = _magics | _async_method_magics
2173        these_magics = orig_magics
2174
2175        if getattr(self, "_mock_methods", None) is not None:
2176            these_magics = orig_magics.intersection(self._mock_methods)
2177
2178            remove_magics = set()
2179            remove_magics = orig_magics - these_magics
2180
2181            for entry in remove_magics:
2182                if entry in type(self).__dict__:
2183                    # remove unneeded magic methods
2184                    delattr(self, entry)
2185
2186        # don't overwrite existing attributes if called a second time
2187        these_magics = these_magics - set(type(self).__dict__)
2188
2189        _type = type(self)
2190        for entry in these_magics:
2191            setattr(_type, entry, MagicProxy(entry, self))
2192
2193
2194
2195class NonCallableMagicMock(MagicMixin, NonCallableMock):
2196    """A version of `MagicMock` that isn't callable."""
2197    def mock_add_spec(self, spec, spec_set=False):
2198        """Add a spec to a mock. `spec` can either be an object or a
2199        list of strings. Only attributes on the `spec` can be fetched as
2200        attributes from the mock.
2201
2202        If `spec_set` is True then only attributes on the spec can be set."""
2203        self._mock_add_spec(spec, spec_set)
2204        self._mock_set_magics()
2205
2206
2207class AsyncMagicMixin(MagicMixin):
2208    pass
2209
2210
2211class MagicMock(MagicMixin, Mock):
2212    """
2213    MagicMock is a subclass of Mock with default implementations
2214    of most of the magic methods. You can use MagicMock without having to
2215    configure the magic methods yourself.
2216
2217    If you use the `spec` or `spec_set` arguments then *only* magic
2218    methods that exist in the spec will be created.
2219
2220    Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2221    """
2222    def mock_add_spec(self, spec, spec_set=False):
2223        """Add a spec to a mock. `spec` can either be an object or a
2224        list of strings. Only attributes on the `spec` can be fetched as
2225        attributes from the mock.
2226
2227        If `spec_set` is True then only attributes on the spec can be set."""
2228        self._mock_add_spec(spec, spec_set)
2229        self._mock_set_magics()
2230
2231    def reset_mock(self, /, *args, return_value=False, **kwargs):
2232        if (
2233            return_value
2234            and self._mock_name
2235            and _is_magic(self._mock_name)
2236        ):
2237            # Don't reset return values for magic methods,
2238            # otherwise `m.__str__` will start
2239            # to return `MagicMock` instances, instead of `str` instances.
2240            return_value = False
2241        super().reset_mock(*args, return_value=return_value, **kwargs)
2242
2243
2244class MagicProxy(Base):
2245    def __init__(self, name, parent):
2246        self.name = name
2247        self.parent = parent
2248
2249    def create_mock(self):
2250        entry = self.name
2251        parent = self.parent
2252        m = parent._get_child_mock(name=entry, _new_name=entry,
2253                                   _new_parent=parent)
2254        setattr(parent, entry, m)
2255        _set_return_value(parent, m, entry)
2256        return m
2257
2258    def __get__(self, obj, _type=None):
2259        return self.create_mock()
2260
2261
2262try:
2263    _CODE_SIG = inspect.signature(partial(CodeType.__init__, None))
2264    _CODE_ATTRS = dir(CodeType)
2265except ValueError:
2266    _CODE_SIG = None
2267
2268
2269class AsyncMockMixin(Base):
2270    await_count = _delegating_property('await_count')
2271    await_args = _delegating_property('await_args')
2272    await_args_list = _delegating_property('await_args_list')
2273
2274    def __init__(self, /, *args, **kwargs):
2275        super().__init__(*args, **kwargs)
2276        # iscoroutinefunction() checks _is_coroutine property to say if an
2277        # object is a coroutine. Without this check it looks to see if it is a
2278        # function/method, which in this case it is not (since it is an
2279        # AsyncMock).
2280        # It is set through __dict__ because when spec_set is True, this
2281        # attribute is likely undefined.
2282        self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2283        self.__dict__['_mock_await_count'] = 0
2284        self.__dict__['_mock_await_args'] = None
2285        self.__dict__['_mock_await_args_list'] = _CallList()
2286        if _CODE_SIG:
2287            code_mock = NonCallableMock(spec_set=_CODE_ATTRS)
2288            code_mock.__dict__["_spec_class"] = CodeType
2289            code_mock.__dict__["_spec_signature"] = _CODE_SIG
2290        else:
2291            code_mock = NonCallableMock(spec_set=CodeType)
2292        code_mock.co_flags = (
2293            inspect.CO_COROUTINE
2294            + inspect.CO_VARARGS
2295            + inspect.CO_VARKEYWORDS
2296        )
2297        code_mock.co_argcount = 0
2298        code_mock.co_varnames = ('args', 'kwargs')
2299        code_mock.co_posonlyargcount = 0
2300        code_mock.co_kwonlyargcount = 0
2301        self.__dict__['__code__'] = code_mock
2302        self.__dict__['__name__'] = 'AsyncMock'
2303        self.__dict__['__defaults__'] = tuple()
2304        self.__dict__['__kwdefaults__'] = {}
2305        self.__dict__['__annotations__'] = None
2306
2307    async def _execute_mock_call(self, /, *args, **kwargs):
2308        # This is nearly just like super(), except for special handling
2309        # of coroutines
2310
2311        _call = _Call((args, kwargs), two=True)
2312        self.await_count += 1
2313        self.await_args = _call
2314        self.await_args_list.append(_call)
2315
2316        effect = self.side_effect
2317        if effect is not None:
2318            if _is_exception(effect):
2319                raise effect
2320            elif not _callable(effect):
2321                try:
2322                    result = next(effect)
2323                except StopIteration:
2324                    # It is impossible to propagate a StopIteration
2325                    # through coroutines because of PEP 479
2326                    raise StopAsyncIteration
2327                if _is_exception(result):
2328                    raise result
2329            elif iscoroutinefunction(effect):
2330                result = await effect(*args, **kwargs)
2331            else:
2332                result = effect(*args, **kwargs)
2333
2334            if result is not DEFAULT:
2335                return result
2336
2337        if self._mock_return_value is not DEFAULT:
2338            return self.return_value
2339
2340        if self._mock_wraps is not None:
2341            if iscoroutinefunction(self._mock_wraps):
2342                return await self._mock_wraps(*args, **kwargs)
2343            return self._mock_wraps(*args, **kwargs)
2344
2345        return self.return_value
2346
2347    def assert_awaited(self):
2348        """
2349        Assert that the mock was awaited at least once.
2350        """
2351        if self.await_count == 0:
2352            msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2353            raise AssertionError(msg)
2354
2355    def assert_awaited_once(self):
2356        """
2357        Assert that the mock was awaited exactly once.
2358        """
2359        if not self.await_count == 1:
2360            msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2361                   f" Awaited {self.await_count} times.")
2362            raise AssertionError(msg)
2363
2364    def assert_awaited_with(self, /, *args, **kwargs):
2365        """
2366        Assert that the last await was with the specified arguments.
2367        """
2368        if self.await_args is None:
2369            expected = self._format_mock_call_signature(args, kwargs)
2370            raise AssertionError(f'Expected await: {expected}\nNot awaited')
2371
2372        def _error_message():
2373            msg = self._format_mock_failure_message(args, kwargs, action='await')
2374            return msg
2375
2376        expected = self._call_matcher(_Call((args, kwargs), two=True))
2377        actual = self._call_matcher(self.await_args)
2378        if actual != expected:
2379            cause = expected if isinstance(expected, Exception) else None
2380            raise AssertionError(_error_message()) from cause
2381
2382    def assert_awaited_once_with(self, /, *args, **kwargs):
2383        """
2384        Assert that the mock was awaited exactly once and with the specified
2385        arguments.
2386        """
2387        if not self.await_count == 1:
2388            msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2389                   f" Awaited {self.await_count} times.")
2390            raise AssertionError(msg)
2391        return self.assert_awaited_with(*args, **kwargs)
2392
2393    def assert_any_await(self, /, *args, **kwargs):
2394        """
2395        Assert the mock has ever been awaited with the specified arguments.
2396        """
2397        expected = self._call_matcher(_Call((args, kwargs), two=True))
2398        cause = expected if isinstance(expected, Exception) else None
2399        actual = [self._call_matcher(c) for c in self.await_args_list]
2400        if cause or expected not in _AnyComparer(actual):
2401            expected_string = self._format_mock_call_signature(args, kwargs)
2402            raise AssertionError(
2403                '%s await not found' % expected_string
2404            ) from cause
2405
2406    def assert_has_awaits(self, calls, any_order=False):
2407        """
2408        Assert the mock has been awaited with the specified calls.
2409        The :attr:`await_args_list` list is checked for the awaits.
2410
2411        If `any_order` is False (the default) then the awaits must be
2412        sequential. There can be extra calls before or after the
2413        specified awaits.
2414
2415        If `any_order` is True then the awaits can be in any order, but
2416        they must all appear in :attr:`await_args_list`.
2417        """
2418        expected = [self._call_matcher(c) for c in calls]
2419        cause = next((e for e in expected if isinstance(e, Exception)), None)
2420        all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2421        if not any_order:
2422            if expected not in all_awaits:
2423                if cause is None:
2424                    problem = 'Awaits not found.'
2425                else:
2426                    problem = ('Error processing expected awaits.\n'
2427                               'Errors: {}').format(
2428                                   [e if isinstance(e, Exception) else None
2429                                    for e in expected])
2430                raise AssertionError(
2431                    f'{problem}\n'
2432                    f'Expected: {_CallList(calls)}\n'
2433                    f'Actual: {self.await_args_list}'
2434                ) from cause
2435            return
2436
2437        all_awaits = list(all_awaits)
2438
2439        not_found = []
2440        for kall in expected:
2441            try:
2442                all_awaits.remove(kall)
2443            except ValueError:
2444                not_found.append(kall)
2445        if not_found:
2446            raise AssertionError(
2447                '%r not all found in await list' % (tuple(not_found),)
2448            ) from cause
2449
2450    def assert_not_awaited(self):
2451        """
2452        Assert that the mock was never awaited.
2453        """
2454        if self.await_count != 0:
2455            msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2456                   f" Awaited {self.await_count} times.")
2457            raise AssertionError(msg)
2458
2459    def reset_mock(self, /, *args, **kwargs):
2460        """
2461        See :func:`.Mock.reset_mock()`
2462        """
2463        super().reset_mock(*args, **kwargs)
2464        self.await_count = 0
2465        self.await_args = None
2466        self.await_args_list = _CallList()
2467
2468
2469class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2470    """
2471    Enhance :class:`Mock` with features allowing to mock
2472    an async function.
2473
2474    The :class:`AsyncMock` object will behave so the object is
2475    recognized as an async function, and the result of a call is an awaitable:
2476
2477    >>> mock = AsyncMock()
2478    >>> iscoroutinefunction(mock)
2479    True
2480    >>> inspect.isawaitable(mock())
2481    True
2482
2483
2484    The result of ``mock()`` is an async function which will have the outcome
2485    of ``side_effect`` or ``return_value``:
2486
2487    - if ``side_effect`` is a function, the async function will return the
2488      result of that function,
2489    - if ``side_effect`` is an exception, the async function will raise the
2490      exception,
2491    - if ``side_effect`` is an iterable, the async function will return the
2492      next value of the iterable, however, if the sequence of result is
2493      exhausted, ``StopIteration`` is raised immediately,
2494    - if ``side_effect`` is not defined, the async function will return the
2495      value defined by ``return_value``, hence, by default, the async function
2496      returns a new :class:`AsyncMock` object.
2497
2498    If the outcome of ``side_effect`` or ``return_value`` is an async function,
2499    the mock async function obtained when the mock object is called will be this
2500    async function itself (and not an async function returning an async
2501    function).
2502
2503    The test author can also specify a wrapped object with ``wraps``. In this
2504    case, the :class:`Mock` object behavior is the same as with an
2505    :class:`.Mock` object: the wrapped object may have methods
2506    defined as async function functions.
2507
2508    Based on Martin Richard's asynctest project.
2509    """
2510
2511
2512class _ANY(object):
2513    "A helper object that compares equal to everything."
2514
2515    def __eq__(self, other):
2516        return True
2517
2518    def __ne__(self, other):
2519        return False
2520
2521    def __repr__(self):
2522        return '<ANY>'
2523
2524ANY = _ANY()
2525
2526
2527
2528def _format_call_signature(name, args, kwargs):
2529    message = '%s(%%s)' % name
2530    formatted_args = ''
2531    args_string = ', '.join([repr(arg) for arg in args])
2532    kwargs_string = ', '.join([
2533        '%s=%r' % (key, value) for key, value in kwargs.items()
2534    ])
2535    if args_string:
2536        formatted_args = args_string
2537    if kwargs_string:
2538        if formatted_args:
2539            formatted_args += ', '
2540        formatted_args += kwargs_string
2541
2542    return message % formatted_args
2543
2544
2545
2546class _Call(tuple):
2547    """
2548    A tuple for holding the results of a call to a mock, either in the form
2549    `(args, kwargs)` or `(name, args, kwargs)`.
2550
2551    If args or kwargs are empty then a call tuple will compare equal to
2552    a tuple without those values. This makes comparisons less verbose::
2553
2554        _Call(('name', (), {})) == ('name',)
2555        _Call(('name', (1,), {})) == ('name', (1,))
2556        _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2557
2558    The `_Call` object provides a useful shortcut for comparing with call::
2559
2560        _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2561        _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2562
2563    If the _Call has no name then it will match any name.
2564    """
2565    def __new__(cls, value=(), name='', parent=None, two=False,
2566                from_kall=True):
2567        args = ()
2568        kwargs = {}
2569        _len = len(value)
2570        if _len == 3:
2571            name, args, kwargs = value
2572        elif _len == 2:
2573            first, second = value
2574            if isinstance(first, str):
2575                name = first
2576                if isinstance(second, tuple):
2577                    args = second
2578                else:
2579                    kwargs = second
2580            else:
2581                args, kwargs = first, second
2582        elif _len == 1:
2583            value, = value
2584            if isinstance(value, str):
2585                name = value
2586            elif isinstance(value, tuple):
2587                args = value
2588            else:
2589                kwargs = value
2590
2591        if two:
2592            return tuple.__new__(cls, (args, kwargs))
2593
2594        return tuple.__new__(cls, (name, args, kwargs))
2595
2596
2597    def __init__(self, value=(), name=None, parent=None, two=False,
2598                 from_kall=True):
2599        self._mock_name = name
2600        self._mock_parent = parent
2601        self._mock_from_kall = from_kall
2602
2603
2604    def __eq__(self, other):
2605        try:
2606            len_other = len(other)
2607        except TypeError:
2608            return NotImplemented
2609
2610        self_name = ''
2611        if len(self) == 2:
2612            self_args, self_kwargs = self
2613        else:
2614            self_name, self_args, self_kwargs = self
2615
2616        if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2617                and self._mock_parent != other._mock_parent):
2618            return False
2619
2620        other_name = ''
2621        if len_other == 0:
2622            other_args, other_kwargs = (), {}
2623        elif len_other == 3:
2624            other_name, other_args, other_kwargs = other
2625        elif len_other == 1:
2626            value, = other
2627            if isinstance(value, tuple):
2628                other_args = value
2629                other_kwargs = {}
2630            elif isinstance(value, str):
2631                other_name = value
2632                other_args, other_kwargs = (), {}
2633            else:
2634                other_args = ()
2635                other_kwargs = value
2636        elif len_other == 2:
2637            # could be (name, args) or (name, kwargs) or (args, kwargs)
2638            first, second = other
2639            if isinstance(first, str):
2640                other_name = first
2641                if isinstance(second, tuple):
2642                    other_args, other_kwargs = second, {}
2643                else:
2644                    other_args, other_kwargs = (), second
2645            else:
2646                other_args, other_kwargs = first, second
2647        else:
2648            return False
2649
2650        if self_name and other_name != self_name:
2651            return False
2652
2653        # this order is important for ANY to work!
2654        return (other_args, other_kwargs) == (self_args, self_kwargs)
2655
2656
2657    __ne__ = object.__ne__
2658
2659
2660    def __call__(self, /, *args, **kwargs):
2661        if self._mock_name is None:
2662            return _Call(('', args, kwargs), name='()')
2663
2664        name = self._mock_name + '()'
2665        return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2666
2667
2668    def __getattr__(self, attr):
2669        if self._mock_name is None:
2670            return _Call(name=attr, from_kall=False)
2671        name = '%s.%s' % (self._mock_name, attr)
2672        return _Call(name=name, parent=self, from_kall=False)
2673
2674
2675    def __getattribute__(self, attr):
2676        if attr in tuple.__dict__:
2677            raise AttributeError
2678        return tuple.__getattribute__(self, attr)
2679
2680
2681    def _get_call_arguments(self):
2682        if len(self) == 2:
2683            args, kwargs = self
2684        else:
2685            name, args, kwargs = self
2686
2687        return args, kwargs
2688
2689    @property
2690    def args(self):
2691        return self._get_call_arguments()[0]
2692
2693    @property
2694    def kwargs(self):
2695        return self._get_call_arguments()[1]
2696
2697    def __repr__(self):
2698        if not self._mock_from_kall:
2699            name = self._mock_name or 'call'
2700            if name.startswith('()'):
2701                name = 'call%s' % name
2702            return name
2703
2704        if len(self) == 2:
2705            name = 'call'
2706            args, kwargs = self
2707        else:
2708            name, args, kwargs = self
2709            if not name:
2710                name = 'call'
2711            elif not name.startswith('()'):
2712                name = 'call.%s' % name
2713            else:
2714                name = 'call%s' % name
2715        return _format_call_signature(name, args, kwargs)
2716
2717
2718    def call_list(self):
2719        """For a call object that represents multiple calls, `call_list`
2720        returns a list of all the intermediate calls as well as the
2721        final call."""
2722        vals = []
2723        thing = self
2724        while thing is not None:
2725            if thing._mock_from_kall:
2726                vals.append(thing)
2727            thing = thing._mock_parent
2728        return _CallList(reversed(vals))
2729
2730
2731call = _Call(from_kall=False)
2732
2733
2734def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2735                    _name=None, *, unsafe=False, **kwargs):
2736    """Create a mock object using another object as a spec. Attributes on the
2737    mock will use the corresponding attribute on the `spec` object as their
2738    spec.
2739
2740    Functions or methods being mocked will have their arguments checked
2741    to check that they are called with the correct signature.
2742
2743    If `spec_set` is True then attempting to set attributes that don't exist
2744    on the spec object will raise an `AttributeError`.
2745
2746    If a class is used as a spec then the return value of the mock (the
2747    instance of the class) will have the same spec. You can use a class as the
2748    spec for an instance object by passing `instance=True`. The returned mock
2749    will only be callable if instances of the mock are callable.
2750
2751    `create_autospec` will raise a `RuntimeError` if passed some common
2752    misspellings of the arguments autospec and spec_set. Pass the argument
2753    `unsafe` with the value True to disable that check.
2754
2755    `create_autospec` also takes arbitrary keyword arguments that are passed to
2756    the constructor of the created mock."""
2757    if _is_list(spec):
2758        # can't pass a list instance to the mock constructor as it will be
2759        # interpreted as a list of strings
2760        spec = type(spec)
2761
2762    is_type = isinstance(spec, type)
2763    if _is_instance_mock(spec):
2764        raise InvalidSpecError(f'Cannot autospec a Mock object. '
2765                               f'[object={spec!r}]')
2766    is_async_func = _is_async_func(spec)
2767    _kwargs = {'spec': spec}
2768    if spec_set:
2769        _kwargs = {'spec_set': spec}
2770    elif spec is None:
2771        # None we mock with a normal mock without a spec
2772        _kwargs = {}
2773    if _kwargs and instance:
2774        _kwargs['_spec_as_instance'] = True
2775    if not unsafe:
2776        _check_spec_arg_typos(kwargs)
2777
2778    _name = kwargs.pop('name', _name)
2779    _new_name = _name
2780    if _parent is None:
2781        # for a top level object no _new_name should be set
2782        _new_name = ''
2783
2784    _kwargs.update(kwargs)
2785
2786    Klass = MagicMock
2787    if inspect.isdatadescriptor(spec):
2788        # descriptors don't have a spec
2789        # because we don't know what type they return
2790        _kwargs = {}
2791    elif is_async_func:
2792        if instance:
2793            raise RuntimeError("Instance can not be True when create_autospec "
2794                               "is mocking an async function")
2795        Klass = AsyncMock
2796    elif not _callable(spec):
2797        Klass = NonCallableMagicMock
2798    elif is_type and instance and not _instance_callable(spec):
2799        Klass = NonCallableMagicMock
2800
2801    mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2802                 name=_name, **_kwargs)
2803
2804    if isinstance(spec, FunctionTypes):
2805        # should only happen at the top level because we don't
2806        # recurse for functions
2807        if is_async_func:
2808            mock = _set_async_signature(mock, spec)
2809        else:
2810            mock = _set_signature(mock, spec)
2811    else:
2812        _check_signature(spec, mock, is_type, instance)
2813
2814    if _parent is not None and not instance:
2815        _parent._mock_children[_name] = mock
2816
2817    # Pop wraps from kwargs because it must not be passed to configure_mock.
2818    wrapped = kwargs.pop('wraps', None)
2819    if is_type and not instance and 'return_value' not in kwargs:
2820        mock.return_value = create_autospec(spec, spec_set, instance=True,
2821                                            _name='()', _parent=mock,
2822                                            wraps=wrapped)
2823
2824    for entry in dir(spec):
2825        if _is_magic(entry):
2826            # MagicMock already does the useful magic methods for us
2827            continue
2828
2829        # XXXX do we need a better way of getting attributes without
2830        # triggering code execution (?) Probably not - we need the actual
2831        # object to mock it so we would rather trigger a property than mock
2832        # the property descriptor. Likewise we want to mock out dynamically
2833        # provided attributes.
2834        # XXXX what about attributes that raise exceptions other than
2835        # AttributeError on being fetched?
2836        # we could be resilient against it, or catch and propagate the
2837        # exception when the attribute is fetched from the mock
2838        try:
2839            original = getattr(spec, entry)
2840        except AttributeError:
2841            continue
2842
2843        child_kwargs = {'spec': original}
2844        # Wrap child attributes also.
2845        if wrapped and hasattr(wrapped, entry):
2846            child_kwargs.update(wraps=original)
2847        if spec_set:
2848            child_kwargs = {'spec_set': original}
2849
2850        if not isinstance(original, FunctionTypes):
2851            new = _SpecState(original, spec_set, mock, entry, instance)
2852            mock._mock_children[entry] = new
2853        else:
2854            parent = mock
2855            if isinstance(spec, FunctionTypes):
2856                parent = mock.mock
2857
2858            skipfirst = _must_skip(spec, entry, is_type)
2859            child_kwargs['_eat_self'] = skipfirst
2860            if iscoroutinefunction(original):
2861                child_klass = AsyncMock
2862            else:
2863                child_klass = MagicMock
2864            new = child_klass(parent=parent, name=entry, _new_name=entry,
2865                              _new_parent=parent, **child_kwargs)
2866            mock._mock_children[entry] = new
2867            new.return_value = child_klass()
2868            _check_signature(original, new, skipfirst=skipfirst)
2869
2870        # so functions created with _set_signature become instance attributes,
2871        # *plus* their underlying mock exists in _mock_children of the parent
2872        # mock. Adding to _mock_children may be unnecessary where we are also
2873        # setting as an instance attribute?
2874        if isinstance(new, FunctionTypes):
2875            setattr(mock, entry, new)
2876    # kwargs are passed with respect to the parent mock so, they are not used
2877    # for creating return_value of the parent mock. So, this condition
2878    # should be true only for the parent mock if kwargs are given.
2879    if _is_instance_mock(mock) and kwargs:
2880        mock.configure_mock(**kwargs)
2881
2882    return mock
2883
2884
2885def _must_skip(spec, entry, is_type):
2886    """
2887    Return whether we should skip the first argument on spec's `entry`
2888    attribute.
2889    """
2890    if not isinstance(spec, type):
2891        if entry in getattr(spec, '__dict__', {}):
2892            # instance attribute - shouldn't skip
2893            return False
2894        spec = spec.__class__
2895
2896    for klass in spec.__mro__:
2897        result = klass.__dict__.get(entry, DEFAULT)
2898        if result is DEFAULT:
2899            continue
2900        if isinstance(result, (staticmethod, classmethod)):
2901            return False
2902        elif isinstance(result, FunctionTypes):
2903            # Normal method => skip if looked up on type
2904            # (if looked up on instance, self is already skipped)
2905            return is_type
2906        else:
2907            return False
2908
2909    # function is a dynamically provided attribute
2910    return is_type
2911
2912
2913class _SpecState(object):
2914
2915    def __init__(self, spec, spec_set=False, parent=None,
2916                 name=None, ids=None, instance=False):
2917        self.spec = spec
2918        self.ids = ids
2919        self.spec_set = spec_set
2920        self.parent = parent
2921        self.instance = instance
2922        self.name = name
2923
2924
2925FunctionTypes = (
2926    # python function
2927    type(create_autospec),
2928    # instance method
2929    type(ANY.__eq__),
2930)
2931
2932
2933file_spec = None
2934open_spec = None
2935
2936
2937def _to_stream(read_data):
2938    if isinstance(read_data, bytes):
2939        return io.BytesIO(read_data)
2940    else:
2941        return io.StringIO(read_data)
2942
2943
2944def mock_open(mock=None, read_data=''):
2945    """
2946    A helper function to create a mock to replace the use of `open`. It works
2947    for `open` called directly or used as a context manager.
2948
2949    The `mock` argument is the mock object to configure. If `None` (the
2950    default) then a `MagicMock` will be created for you, with the API limited
2951    to methods or attributes available on standard file handles.
2952
2953    `read_data` is a string for the `read`, `readline` and `readlines` of the
2954    file handle to return.  This is an empty string by default.
2955    """
2956    _read_data = _to_stream(read_data)
2957    _state = [_read_data, None]
2958
2959    def _readlines_side_effect(*args, **kwargs):
2960        if handle.readlines.return_value is not None:
2961            return handle.readlines.return_value
2962        return _state[0].readlines(*args, **kwargs)
2963
2964    def _read_side_effect(*args, **kwargs):
2965        if handle.read.return_value is not None:
2966            return handle.read.return_value
2967        return _state[0].read(*args, **kwargs)
2968
2969    def _readline_side_effect(*args, **kwargs):
2970        yield from _iter_side_effect()
2971        while True:
2972            yield _state[0].readline(*args, **kwargs)
2973
2974    def _iter_side_effect():
2975        if handle.readline.return_value is not None:
2976            while True:
2977                yield handle.readline.return_value
2978        for line in _state[0]:
2979            yield line
2980
2981    def _next_side_effect():
2982        if handle.readline.return_value is not None:
2983            return handle.readline.return_value
2984        return next(_state[0])
2985
2986    def _exit_side_effect(exctype, excinst, exctb):
2987        handle.close()
2988
2989    global file_spec
2990    if file_spec is None:
2991        import _io
2992        file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2993
2994    global open_spec
2995    if open_spec is None:
2996        import _io
2997        open_spec = list(set(dir(_io.open)))
2998    if mock is None:
2999        mock = MagicMock(name='open', spec=open_spec)
3000
3001    handle = MagicMock(spec=file_spec)
3002    handle.__enter__.return_value = handle
3003
3004    handle.write.return_value = None
3005    handle.read.return_value = None
3006    handle.readline.return_value = None
3007    handle.readlines.return_value = None
3008
3009    handle.read.side_effect = _read_side_effect
3010    _state[1] = _readline_side_effect()
3011    handle.readline.side_effect = _state[1]
3012    handle.readlines.side_effect = _readlines_side_effect
3013    handle.__iter__.side_effect = _iter_side_effect
3014    handle.__next__.side_effect = _next_side_effect
3015    handle.__exit__.side_effect = _exit_side_effect
3016
3017    def reset_data(*args, **kwargs):
3018        _state[0] = _to_stream(read_data)
3019        if handle.readline.side_effect == _state[1]:
3020            # Only reset the side effect if the user hasn't overridden it.
3021            _state[1] = _readline_side_effect()
3022            handle.readline.side_effect = _state[1]
3023        return DEFAULT
3024
3025    mock.side_effect = reset_data
3026    mock.return_value = handle
3027    return mock
3028
3029
3030class PropertyMock(Mock):
3031    """
3032    A mock intended to be used as a property, or other descriptor, on a class.
3033    `PropertyMock` provides `__get__` and `__set__` methods so you can specify
3034    a return value when it is fetched.
3035
3036    Fetching a `PropertyMock` instance from an object calls the mock, with
3037    no args. Setting it calls the mock with the value being set.
3038    """
3039    def _get_child_mock(self, /, **kwargs):
3040        return MagicMock(**kwargs)
3041
3042    def __get__(self, obj, obj_type=None):
3043        return self()
3044    def __set__(self, obj, val):
3045        self(val)
3046
3047
3048_timeout_unset = sentinel.TIMEOUT_UNSET
3049
3050class ThreadingMixin(Base):
3051
3052    DEFAULT_TIMEOUT = None
3053
3054    def _get_child_mock(self, /, **kw):
3055        if isinstance(kw.get("parent"), ThreadingMixin):
3056            kw["timeout"] = kw["parent"]._mock_wait_timeout
3057        elif isinstance(kw.get("_new_parent"), ThreadingMixin):
3058            kw["timeout"] = kw["_new_parent"]._mock_wait_timeout
3059        return super()._get_child_mock(**kw)
3060
3061    def __init__(self, *args, timeout=_timeout_unset, **kwargs):
3062        super().__init__(*args, **kwargs)
3063        if timeout is _timeout_unset:
3064            timeout = self.DEFAULT_TIMEOUT
3065        self.__dict__["_mock_event"] = threading.Event()  # Event for any call
3066        self.__dict__["_mock_calls_events"] = []  # Events for each of the calls
3067        self.__dict__["_mock_calls_events_lock"] = threading.Lock()
3068        self.__dict__["_mock_wait_timeout"] = timeout
3069
3070    def reset_mock(self, /, *args, **kwargs):
3071        """
3072        See :func:`.Mock.reset_mock()`
3073        """
3074        super().reset_mock(*args, **kwargs)
3075        self.__dict__["_mock_event"] = threading.Event()
3076        self.__dict__["_mock_calls_events"] = []
3077
3078    def __get_event(self, expected_args, expected_kwargs):
3079        with self._mock_calls_events_lock:
3080            for args, kwargs, event in self._mock_calls_events:
3081                if (args, kwargs) == (expected_args, expected_kwargs):
3082                    return event
3083            new_event = threading.Event()
3084            self._mock_calls_events.append((expected_args, expected_kwargs, new_event))
3085        return new_event
3086
3087    def _mock_call(self, *args, **kwargs):
3088        ret_value = super()._mock_call(*args, **kwargs)
3089
3090        call_event = self.__get_event(args, kwargs)
3091        call_event.set()
3092
3093        self._mock_event.set()
3094
3095        return ret_value
3096
3097    def wait_until_called(self, *, timeout=_timeout_unset):
3098        """Wait until the mock object is called.
3099
3100        `timeout` - time to wait for in seconds, waits forever otherwise.
3101        Defaults to the constructor provided timeout.
3102        Use None to block undefinetively.
3103        """
3104        if timeout is _timeout_unset:
3105            timeout = self._mock_wait_timeout
3106        if not self._mock_event.wait(timeout=timeout):
3107            msg = (f"{self._mock_name or 'mock'} was not called before"
3108                   f" timeout({timeout}).")
3109            raise AssertionError(msg)
3110
3111    def wait_until_any_call_with(self, *args, **kwargs):
3112        """Wait until the mock object is called with given args.
3113
3114        Waits for the timeout in seconds provided in the constructor.
3115        """
3116        event = self.__get_event(args, kwargs)
3117        if not event.wait(timeout=self._mock_wait_timeout):
3118            expected_string = self._format_mock_call_signature(args, kwargs)
3119            raise AssertionError(f'{expected_string} call not found')
3120
3121
3122class ThreadingMock(ThreadingMixin, MagicMixin, Mock):
3123    """
3124    A mock that can be used to wait until on calls happening
3125    in a different thread.
3126
3127    The constructor can take a `timeout` argument which
3128    controls the timeout in seconds for all `wait` calls of the mock.
3129
3130    You can change the default timeout of all instances via the
3131    `ThreadingMock.DEFAULT_TIMEOUT` attribute.
3132
3133    If no timeout is set, it will block undefinetively.
3134    """
3135    pass
3136
3137
3138def seal(mock):
3139    """Disable the automatic generation of child mocks.
3140
3141    Given an input Mock, seals it to ensure no further mocks will be generated
3142    when accessing an attribute that was not already defined.
3143
3144    The operation recursively seals the mock passed in, meaning that
3145    the mock itself, any mocks generated by accessing one of its attributes,
3146    and all assigned mocks without a name or spec will be sealed.
3147    """
3148    mock._mock_sealed = True
3149    for attr in dir(mock):
3150        try:
3151            m = getattr(mock, attr)
3152        except AttributeError:
3153            continue
3154        if not isinstance(m, NonCallableMock):
3155            continue
3156        if isinstance(m._mock_children.get(attr), _SpecState):
3157            continue
3158        if m._mock_new_parent is mock:
3159            seal(m)
3160
3161
3162class _AsyncIterator:
3163    """
3164    Wraps an iterator in an asynchronous iterator.
3165    """
3166    def __init__(self, iterator):
3167        self.iterator = iterator
3168        code_mock = NonCallableMock(spec_set=CodeType)
3169        code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
3170        self.__dict__['__code__'] = code_mock
3171
3172    async def __anext__(self):
3173        try:
3174            return next(self.iterator)
3175        except StopIteration:
3176            pass
3177        raise StopAsyncIteration
3178