1 # mock.py
2 # Test tools for mocking and patching.
3 # Maintained by Michael Foord
4 # Backport for other versions of Python available from
5 # https://pypi.org/project/mock
6
7 __all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'FILTER_DIR',
18 'NonCallableMock',
19 'NonCallableMagicMock',
20 'mock_open',
21 'PropertyMock',
22 'seal',
23 )
24
25
26 import asyncio
27 import contextlib
28 import io
29 import inspect
30 import pprint
31 import sys
32 import builtins
33 import pkgutil
34 from asyncio import iscoroutinefunction
35 from types import CodeType, ModuleType, MethodType
36 from unittest.util import safe_repr
37 from functools import wraps, partial
38 from threading import RLock
39
40
41 class ESC[4;38;5;81mInvalidSpecError(ESC[4;38;5;149mException):
42 """Indicates that an invalid value was used as a mock spec."""
43
44
45 _builtins = {name for name in dir(builtins) if not name.startswith('_')}
46
47 FILTER_DIR = True
48
49 # Workaround for issue #12370
50 # Without this, the __class__ properties wouldn't be set correctly
51 _safe_super = super
52
53 def _is_async_obj(obj):
54 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
55 return False
56 if hasattr(obj, '__func__'):
57 obj = getattr(obj, '__func__')
58 return iscoroutinefunction(obj) or inspect.isawaitable(obj)
59
60
61 def _is_async_func(func):
62 if getattr(func, '__code__', None):
63 return iscoroutinefunction(func)
64 else:
65 return False
66
67
68 def _is_instance_mock(obj):
69 # can't use isinstance on Mock objects because they override __class__
70 # The base class for all mocks is NonCallableMock
71 return issubclass(type(obj), NonCallableMock)
72
73
74 def _is_exception(obj):
75 return (
76 isinstance(obj, BaseException) or
77 isinstance(obj, type) and issubclass(obj, BaseException)
78 )
79
80
81 def _extract_mock(obj):
82 # Autospecced functions will return a FunctionType with "mock" attribute
83 # which is the actual mock object that needs to be used.
84 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
85 return obj.mock
86 else:
87 return obj
88
89
90 def _get_signature_object(func, as_instance, eat_self):
91 """
92 Given an arbitrary, possibly callable object, try to create a suitable
93 signature object.
94 Return a (reduced func, signature) tuple, or None.
95 """
96 if isinstance(func, type) and not as_instance:
97 # If it's a type and should be modelled as a type, use __init__.
98 func = func.__init__
99 # Skip the `self` argument in __init__
100 eat_self = True
101 elif isinstance(func, (classmethod, staticmethod)):
102 if isinstance(func, classmethod):
103 # Skip the `cls` argument of a class method
104 eat_self = True
105 # Use the original decorated method to extract the correct function signature
106 func = func.__func__
107 elif not isinstance(func, FunctionTypes):
108 # If we really want to model an instance of the passed type,
109 # __call__ should be looked up, not __init__.
110 try:
111 func = func.__call__
112 except AttributeError:
113 return None
114 if eat_self:
115 sig_func = partial(func, None)
116 else:
117 sig_func = func
118 try:
119 return func, inspect.signature(sig_func)
120 except ValueError:
121 # Certain callable types are not supported by inspect.signature()
122 return None
123
124
125 def _check_signature(func, mock, skipfirst, instance=False):
126 sig = _get_signature_object(func, instance, skipfirst)
127 if sig is None:
128 return
129 func, sig = sig
130 def checksig(self, /, *args, **kwargs):
131 sig.bind(*args, **kwargs)
132 _copy_func_details(func, checksig)
133 type(mock)._mock_check_sig = checksig
134 type(mock).__signature__ = sig
135
136
137 def _copy_func_details(func, funcopy):
138 # we explicitly don't copy func.__dict__ into this copy as it would
139 # expose original attributes that should be mocked
140 for attribute in (
141 '__name__', '__doc__', '__text_signature__',
142 '__module__', '__defaults__', '__kwdefaults__',
143 ):
144 try:
145 setattr(funcopy, attribute, getattr(func, attribute))
146 except AttributeError:
147 pass
148
149
150 def _callable(obj):
151 if isinstance(obj, type):
152 return True
153 if isinstance(obj, (staticmethod, classmethod, MethodType)):
154 return _callable(obj.__func__)
155 if getattr(obj, '__call__', None) is not None:
156 return True
157 return False
158
159
160 def _is_list(obj):
161 # checks for list or tuples
162 # XXXX badly named!
163 return type(obj) in (list, tuple)
164
165
166 def _instance_callable(obj):
167 """Given an object, return True if the object is callable.
168 For classes, return True if instances would be callable."""
169 if not isinstance(obj, type):
170 # already an instance
171 return getattr(obj, '__call__', None) is not None
172
173 # *could* be broken by a class overriding __mro__ or __dict__ via
174 # a metaclass
175 for base in (obj,) + obj.__mro__:
176 if base.__dict__.get('__call__') is not None:
177 return True
178 return False
179
180
181 def _set_signature(mock, original, instance=False):
182 # creates a function with signature (*args, **kwargs) that delegates to a
183 # mock. It still does signature checking by calling a lambda with the same
184 # signature as the original.
185
186 skipfirst = isinstance(original, type)
187 result = _get_signature_object(original, instance, skipfirst)
188 if result is None:
189 return mock
190 func, sig = result
191 def checksig(*args, **kwargs):
192 sig.bind(*args, **kwargs)
193 _copy_func_details(func, checksig)
194
195 name = original.__name__
196 if not name.isidentifier():
197 name = 'funcopy'
198 context = {'_checksig_': checksig, 'mock': mock}
199 src = """def %s(*args, **kwargs):
200 _checksig_(*args, **kwargs)
201 return mock(*args, **kwargs)""" % name
202 exec (src, context)
203 funcopy = context[name]
204 _setup_func(funcopy, mock, sig)
205 return funcopy
206
207
208 def _setup_func(funcopy, mock, sig):
209 funcopy.mock = mock
210
211 def assert_called_with(*args, **kwargs):
212 return mock.assert_called_with(*args, **kwargs)
213 def assert_called(*args, **kwargs):
214 return mock.assert_called(*args, **kwargs)
215 def assert_not_called(*args, **kwargs):
216 return mock.assert_not_called(*args, **kwargs)
217 def assert_called_once(*args, **kwargs):
218 return mock.assert_called_once(*args, **kwargs)
219 def assert_called_once_with(*args, **kwargs):
220 return mock.assert_called_once_with(*args, **kwargs)
221 def assert_has_calls(*args, **kwargs):
222 return mock.assert_has_calls(*args, **kwargs)
223 def assert_any_call(*args, **kwargs):
224 return mock.assert_any_call(*args, **kwargs)
225 def reset_mock():
226 funcopy.method_calls = _CallList()
227 funcopy.mock_calls = _CallList()
228 mock.reset_mock()
229 ret = funcopy.return_value
230 if _is_instance_mock(ret) and not ret is mock:
231 ret.reset_mock()
232
233 funcopy.called = False
234 funcopy.call_count = 0
235 funcopy.call_args = None
236 funcopy.call_args_list = _CallList()
237 funcopy.method_calls = _CallList()
238 funcopy.mock_calls = _CallList()
239
240 funcopy.return_value = mock.return_value
241 funcopy.side_effect = mock.side_effect
242 funcopy._mock_children = mock._mock_children
243
244 funcopy.assert_called_with = assert_called_with
245 funcopy.assert_called_once_with = assert_called_once_with
246 funcopy.assert_has_calls = assert_has_calls
247 funcopy.assert_any_call = assert_any_call
248 funcopy.reset_mock = reset_mock
249 funcopy.assert_called = assert_called
250 funcopy.assert_not_called = assert_not_called
251 funcopy.assert_called_once = assert_called_once
252 funcopy.__signature__ = sig
253
254 mock._mock_delegate = funcopy
255
256
257 def _setup_async_mock(mock):
258 mock._is_coroutine = asyncio.coroutines._is_coroutine
259 mock.await_count = 0
260 mock.await_args = None
261 mock.await_args_list = _CallList()
262
263 # Mock is not configured yet so the attributes are set
264 # to a function and then the corresponding mock helper function
265 # is called when the helper is accessed similar to _setup_func.
266 def wrapper(attr, /, *args, **kwargs):
267 return getattr(mock.mock, attr)(*args, **kwargs)
268
269 for attribute in ('assert_awaited',
270 'assert_awaited_once',
271 'assert_awaited_with',
272 'assert_awaited_once_with',
273 'assert_any_await',
274 'assert_has_awaits',
275 'assert_not_awaited'):
276
277 # setattr(mock, attribute, wrapper) causes late binding
278 # hence attribute will always be the last value in the loop
279 # Use partial(wrapper, attribute) to ensure the attribute is bound
280 # correctly.
281 setattr(mock, attribute, partial(wrapper, attribute))
282
283
284 def _is_magic(name):
285 return '__%s__' % name[2:-2] == name
286
287
288 class ESC[4;38;5;81m_SentinelObject(ESC[4;38;5;149mobject):
289 "A unique, named, sentinel object."
290 def __init__(self, name):
291 self.name = name
292
293 def __repr__(self):
294 return 'sentinel.%s' % self.name
295
296 def __reduce__(self):
297 return 'sentinel.%s' % self.name
298
299
300 class ESC[4;38;5;81m_Sentinel(ESC[4;38;5;149mobject):
301 """Access attributes to return a named object, usable as a sentinel."""
302 def __init__(self):
303 self._sentinels = {}
304
305 def __getattr__(self, name):
306 if name == '__bases__':
307 # Without this help(unittest.mock) raises an exception
308 raise AttributeError
309 return self._sentinels.setdefault(name, _SentinelObject(name))
310
311 def __reduce__(self):
312 return 'sentinel'
313
314
315 sentinel = _Sentinel()
316
317 DEFAULT = sentinel.DEFAULT
318 _missing = sentinel.MISSING
319 _deleted = sentinel.DELETED
320
321
322 _allowed_names = {
323 'return_value', '_mock_return_value', 'side_effect',
324 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
325 '_mock_name', '_mock_new_name'
326 }
327
328
329 def _delegating_property(name):
330 _allowed_names.add(name)
331 _the_name = '_mock_' + name
332 def _get(self, name=name, _the_name=_the_name):
333 sig = self._mock_delegate
334 if sig is None:
335 return getattr(self, _the_name)
336 return getattr(sig, name)
337 def _set(self, value, name=name, _the_name=_the_name):
338 sig = self._mock_delegate
339 if sig is None:
340 self.__dict__[_the_name] = value
341 else:
342 setattr(sig, name, value)
343
344 return property(_get, _set)
345
346
347
348 class ESC[4;38;5;81m_CallList(ESC[4;38;5;149mlist):
349
350 def __contains__(self, value):
351 if not isinstance(value, list):
352 return list.__contains__(self, value)
353 len_value = len(value)
354 len_self = len(self)
355 if len_value > len_self:
356 return False
357
358 for i in range(0, len_self - len_value + 1):
359 sub_list = self[i:i+len_value]
360 if sub_list == value:
361 return True
362 return False
363
364 def __repr__(self):
365 return pprint.pformat(list(self))
366
367
368 def _check_and_set_parent(parent, value, name, new_name):
369 value = _extract_mock(value)
370
371 if not _is_instance_mock(value):
372 return False
373 if ((value._mock_name or value._mock_new_name) or
374 (value._mock_parent is not None) or
375 (value._mock_new_parent is not None)):
376 return False
377
378 _parent = parent
379 while _parent is not None:
380 # setting a mock (value) as a child or return value of itself
381 # should not modify the mock
382 if _parent is value:
383 return False
384 _parent = _parent._mock_new_parent
385
386 if new_name:
387 value._mock_new_parent = parent
388 value._mock_new_name = new_name
389 if name:
390 value._mock_parent = parent
391 value._mock_name = name
392 return True
393
394 # Internal class to identify if we wrapped an iterator object or not.
395 class ESC[4;38;5;81m_MockIter(ESC[4;38;5;149mobject):
396 def __init__(self, obj):
397 self.obj = iter(obj)
398 def __next__(self):
399 return next(self.obj)
400
401 class ESC[4;38;5;81mBase(ESC[4;38;5;149mobject):
402 _mock_return_value = DEFAULT
403 _mock_side_effect = None
404 def __init__(self, /, *args, **kwargs):
405 pass
406
407
408
409 class ESC[4;38;5;81mNonCallableMock(ESC[4;38;5;149mBase):
410 """A non-callable version of `Mock`"""
411
412 # Store a mutex as a class attribute in order to protect concurrent access
413 # to mock attributes. Using a class attribute allows all NonCallableMock
414 # instances to share the mutex for simplicity.
415 #
416 # See https://github.com/python/cpython/issues/98624 for why this is
417 # necessary.
418 _lock = RLock()
419
420 def __new__(cls, /, *args, **kw):
421 # every instance has its own class
422 # so we can create magic methods on the
423 # class without stomping on other mocks
424 bases = (cls,)
425 if not issubclass(cls, AsyncMockMixin):
426 # Check if spec is an async object or function
427 bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments
428 spec_arg = bound_args.get('spec_set', bound_args.get('spec'))
429 if spec_arg is not None and _is_async_obj(spec_arg):
430 bases = (AsyncMockMixin, cls)
431 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
432 instance = _safe_super(NonCallableMock, cls).__new__(new)
433 return instance
434
435
436 def __init__(
437 self, spec=None, wraps=None, name=None, spec_set=None,
438 parent=None, _spec_state=None, _new_name='', _new_parent=None,
439 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
440 ):
441 if _new_parent is None:
442 _new_parent = parent
443
444 __dict__ = self.__dict__
445 __dict__['_mock_parent'] = parent
446 __dict__['_mock_name'] = name
447 __dict__['_mock_new_name'] = _new_name
448 __dict__['_mock_new_parent'] = _new_parent
449 __dict__['_mock_sealed'] = False
450
451 if spec_set is not None:
452 spec = spec_set
453 spec_set = True
454 if _eat_self is None:
455 _eat_self = parent is not None
456
457 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
458
459 __dict__['_mock_children'] = {}
460 __dict__['_mock_wraps'] = wraps
461 __dict__['_mock_delegate'] = None
462
463 __dict__['_mock_called'] = False
464 __dict__['_mock_call_args'] = None
465 __dict__['_mock_call_count'] = 0
466 __dict__['_mock_call_args_list'] = _CallList()
467 __dict__['_mock_mock_calls'] = _CallList()
468
469 __dict__['method_calls'] = _CallList()
470 __dict__['_mock_unsafe'] = unsafe
471
472 if kwargs:
473 self.configure_mock(**kwargs)
474
475 _safe_super(NonCallableMock, self).__init__(
476 spec, wraps, name, spec_set, parent,
477 _spec_state
478 )
479
480
481 def attach_mock(self, mock, attribute):
482 """
483 Attach a mock as an attribute of this one, replacing its name and
484 parent. Calls to the attached mock will be recorded in the
485 `method_calls` and `mock_calls` attributes of this one."""
486 inner_mock = _extract_mock(mock)
487
488 inner_mock._mock_parent = None
489 inner_mock._mock_new_parent = None
490 inner_mock._mock_name = ''
491 inner_mock._mock_new_name = None
492
493 setattr(self, attribute, mock)
494
495
496 def mock_add_spec(self, spec, spec_set=False):
497 """Add a spec to a mock. `spec` can either be an object or a
498 list of strings. Only attributes on the `spec` can be fetched as
499 attributes from the mock.
500
501 If `spec_set` is True then only attributes on the spec can be set."""
502 self._mock_add_spec(spec, spec_set)
503
504
505 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
506 _eat_self=False):
507 if _is_instance_mock(spec):
508 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]')
509
510 _spec_class = None
511 _spec_signature = None
512 _spec_asyncs = []
513
514 for attr in dir(spec):
515 if iscoroutinefunction(getattr(spec, attr, None)):
516 _spec_asyncs.append(attr)
517
518 if spec is not None and not _is_list(spec):
519 if isinstance(spec, type):
520 _spec_class = spec
521 else:
522 _spec_class = type(spec)
523 res = _get_signature_object(spec,
524 _spec_as_instance, _eat_self)
525 _spec_signature = res and res[1]
526
527 spec = dir(spec)
528
529 __dict__ = self.__dict__
530 __dict__['_spec_class'] = _spec_class
531 __dict__['_spec_set'] = spec_set
532 __dict__['_spec_signature'] = _spec_signature
533 __dict__['_mock_methods'] = spec
534 __dict__['_spec_asyncs'] = _spec_asyncs
535
536 def __get_return_value(self):
537 ret = self._mock_return_value
538 if self._mock_delegate is not None:
539 ret = self._mock_delegate.return_value
540
541 if ret is DEFAULT:
542 ret = self._get_child_mock(
543 _new_parent=self, _new_name='()'
544 )
545 self.return_value = ret
546 return ret
547
548
549 def __set_return_value(self, value):
550 if self._mock_delegate is not None:
551 self._mock_delegate.return_value = value
552 else:
553 self._mock_return_value = value
554 _check_and_set_parent(self, value, None, '()')
555
556 __return_value_doc = "The value to be returned when the mock is called."
557 return_value = property(__get_return_value, __set_return_value,
558 __return_value_doc)
559
560
561 @property
562 def __class__(self):
563 if self._spec_class is None:
564 return type(self)
565 return self._spec_class
566
567 called = _delegating_property('called')
568 call_count = _delegating_property('call_count')
569 call_args = _delegating_property('call_args')
570 call_args_list = _delegating_property('call_args_list')
571 mock_calls = _delegating_property('mock_calls')
572
573
574 def __get_side_effect(self):
575 delegated = self._mock_delegate
576 if delegated is None:
577 return self._mock_side_effect
578 sf = delegated.side_effect
579 if (sf is not None and not callable(sf)
580 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
581 sf = _MockIter(sf)
582 delegated.side_effect = sf
583 return sf
584
585 def __set_side_effect(self, value):
586 value = _try_iter(value)
587 delegated = self._mock_delegate
588 if delegated is None:
589 self._mock_side_effect = value
590 else:
591 delegated.side_effect = value
592
593 side_effect = property(__get_side_effect, __set_side_effect)
594
595
596 def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
597 "Restore the mock object to its initial state."
598 if visited is None:
599 visited = []
600 if id(self) in visited:
601 return
602 visited.append(id(self))
603
604 self.called = False
605 self.call_args = None
606 self.call_count = 0
607 self.mock_calls = _CallList()
608 self.call_args_list = _CallList()
609 self.method_calls = _CallList()
610
611 if return_value:
612 self._mock_return_value = DEFAULT
613 if side_effect:
614 self._mock_side_effect = None
615
616 for child in self._mock_children.values():
617 if isinstance(child, _SpecState) or child is _deleted:
618 continue
619 child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
620
621 ret = self._mock_return_value
622 if _is_instance_mock(ret) and ret is not self:
623 ret.reset_mock(visited)
624
625
626 def configure_mock(self, /, **kwargs):
627 """Set attributes on the mock through keyword arguments.
628
629 Attributes plus return values and side effects can be set on child
630 mocks using standard dot notation and unpacking a dictionary in the
631 method call:
632
633 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
634 >>> mock.configure_mock(**attrs)"""
635 for arg, val in sorted(kwargs.items(),
636 # we sort on the number of dots so that
637 # attributes are set before we set attributes on
638 # attributes
639 key=lambda entry: entry[0].count('.')):
640 args = arg.split('.')
641 final = args.pop()
642 obj = self
643 for entry in args:
644 obj = getattr(obj, entry)
645 setattr(obj, final, val)
646
647
648 def __getattr__(self, name):
649 if name in {'_mock_methods', '_mock_unsafe'}:
650 raise AttributeError(name)
651 elif self._mock_methods is not None:
652 if name not in self._mock_methods or name in _all_magics:
653 raise AttributeError("Mock object has no attribute %r" % name)
654 elif _is_magic(name):
655 raise AttributeError(name)
656 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods):
657 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')):
658 raise AttributeError(
659 f"{name!r} is not a valid assertion. Use a spec "
660 f"for the mock if {name!r} is meant to be an attribute.")
661
662 with NonCallableMock._lock:
663 result = self._mock_children.get(name)
664 if result is _deleted:
665 raise AttributeError(name)
666 elif result is None:
667 wraps = None
668 if self._mock_wraps is not None:
669 # XXXX should we get the attribute without triggering code
670 # execution?
671 wraps = getattr(self._mock_wraps, name)
672
673 result = self._get_child_mock(
674 parent=self, name=name, wraps=wraps, _new_name=name,
675 _new_parent=self
676 )
677 self._mock_children[name] = result
678
679 elif isinstance(result, _SpecState):
680 try:
681 result = create_autospec(
682 result.spec, result.spec_set, result.instance,
683 result.parent, result.name
684 )
685 except InvalidSpecError:
686 target_name = self.__dict__['_mock_name'] or self
687 raise InvalidSpecError(
688 f'Cannot autospec attr {name!r} from target '
689 f'{target_name!r} as it has already been mocked out. '
690 f'[target={self!r}, attr={result.spec!r}]')
691 self._mock_children[name] = result
692
693 return result
694
695
696 def _extract_mock_name(self):
697 _name_list = [self._mock_new_name]
698 _parent = self._mock_new_parent
699 last = self
700
701 dot = '.'
702 if _name_list == ['()']:
703 dot = ''
704
705 while _parent is not None:
706 last = _parent
707
708 _name_list.append(_parent._mock_new_name + dot)
709 dot = '.'
710 if _parent._mock_new_name == '()':
711 dot = ''
712
713 _parent = _parent._mock_new_parent
714
715 _name_list = list(reversed(_name_list))
716 _first = last._mock_name or 'mock'
717 if len(_name_list) > 1:
718 if _name_list[1] not in ('()', '().'):
719 _first += '.'
720 _name_list[0] = _first
721 return ''.join(_name_list)
722
723 def __repr__(self):
724 name = self._extract_mock_name()
725
726 name_string = ''
727 if name not in ('mock', 'mock.'):
728 name_string = ' name=%r' % name
729
730 spec_string = ''
731 if self._spec_class is not None:
732 spec_string = ' spec=%r'
733 if self._spec_set:
734 spec_string = ' spec_set=%r'
735 spec_string = spec_string % self._spec_class.__name__
736 return "<%s%s%s id='%s'>" % (
737 type(self).__name__,
738 name_string,
739 spec_string,
740 id(self)
741 )
742
743
744 def __dir__(self):
745 """Filter the output of `dir(mock)` to only useful members."""
746 if not FILTER_DIR:
747 return object.__dir__(self)
748
749 extras = self._mock_methods or []
750 from_type = dir(type(self))
751 from_dict = list(self.__dict__)
752 from_child_mocks = [
753 m_name for m_name, m_value in self._mock_children.items()
754 if m_value is not _deleted]
755
756 from_type = [e for e in from_type if not e.startswith('_')]
757 from_dict = [e for e in from_dict if not e.startswith('_') or
758 _is_magic(e)]
759 return sorted(set(extras + from_type + from_dict + from_child_mocks))
760
761
762 def __setattr__(self, name, value):
763 if name in _allowed_names:
764 # property setters go through here
765 return object.__setattr__(self, name, value)
766 elif (self._spec_set and self._mock_methods is not None and
767 name not in self._mock_methods and
768 name not in self.__dict__):
769 raise AttributeError("Mock object has no attribute '%s'" % name)
770 elif name in _unsupported_magics:
771 msg = 'Attempting to set unsupported magic method %r.' % name
772 raise AttributeError(msg)
773 elif name in _all_magics:
774 if self._mock_methods is not None and name not in self._mock_methods:
775 raise AttributeError("Mock object has no attribute '%s'" % name)
776
777 if not _is_instance_mock(value):
778 setattr(type(self), name, _get_method(name, value))
779 original = value
780 value = lambda *args, **kw: original(self, *args, **kw)
781 else:
782 # only set _new_name and not name so that mock_calls is tracked
783 # but not method calls
784 _check_and_set_parent(self, value, None, name)
785 setattr(type(self), name, value)
786 self._mock_children[name] = value
787 elif name == '__class__':
788 self._spec_class = value
789 return
790 else:
791 if _check_and_set_parent(self, value, name, name):
792 self._mock_children[name] = value
793
794 if self._mock_sealed and not hasattr(self, name):
795 mock_name = f'{self._extract_mock_name()}.{name}'
796 raise AttributeError(f'Cannot set {mock_name}')
797
798 return object.__setattr__(self, name, value)
799
800
801 def __delattr__(self, name):
802 if name in _all_magics and name in type(self).__dict__:
803 delattr(type(self), name)
804 if name not in self.__dict__:
805 # for magic methods that are still MagicProxy objects and
806 # not set on the instance itself
807 return
808
809 obj = self._mock_children.get(name, _missing)
810 if name in self.__dict__:
811 _safe_super(NonCallableMock, self).__delattr__(name)
812 elif obj is _deleted:
813 raise AttributeError(name)
814 if obj is not _missing:
815 del self._mock_children[name]
816 self._mock_children[name] = _deleted
817
818
819 def _format_mock_call_signature(self, args, kwargs):
820 name = self._mock_name or 'mock'
821 return _format_call_signature(name, args, kwargs)
822
823
824 def _format_mock_failure_message(self, args, kwargs, action='call'):
825 message = 'expected %s not found.\nExpected: %s\n Actual: %s'
826 expected_string = self._format_mock_call_signature(args, kwargs)
827 call_args = self.call_args
828 actual_string = self._format_mock_call_signature(*call_args)
829 return message % (action, expected_string, actual_string)
830
831
832 def _get_call_signature_from_name(self, name):
833 """
834 * If call objects are asserted against a method/function like obj.meth1
835 then there could be no name for the call object to lookup. Hence just
836 return the spec_signature of the method/function being asserted against.
837 * If the name is not empty then remove () and split by '.' to get
838 list of names to iterate through the children until a potential
839 match is found. A child mock is created only during attribute access
840 so if we get a _SpecState then no attributes of the spec were accessed
841 and can be safely exited.
842 """
843 if not name:
844 return self._spec_signature
845
846 sig = None
847 names = name.replace('()', '').split('.')
848 children = self._mock_children
849
850 for name in names:
851 child = children.get(name)
852 if child is None or isinstance(child, _SpecState):
853 break
854 else:
855 # If an autospecced object is attached using attach_mock the
856 # child would be a function with mock object as attribute from
857 # which signature has to be derived.
858 child = _extract_mock(child)
859 children = child._mock_children
860 sig = child._spec_signature
861
862 return sig
863
864
865 def _call_matcher(self, _call):
866 """
867 Given a call (or simply an (args, kwargs) tuple), return a
868 comparison key suitable for matching with other calls.
869 This is a best effort method which relies on the spec's signature,
870 if available, or falls back on the arguments themselves.
871 """
872
873 if isinstance(_call, tuple) and len(_call) > 2:
874 sig = self._get_call_signature_from_name(_call[0])
875 else:
876 sig = self._spec_signature
877
878 if sig is not None:
879 if len(_call) == 2:
880 name = ''
881 args, kwargs = _call
882 else:
883 name, args, kwargs = _call
884 try:
885 bound_call = sig.bind(*args, **kwargs)
886 return call(name, bound_call.args, bound_call.kwargs)
887 except TypeError as e:
888 return e.with_traceback(None)
889 else:
890 return _call
891
892 def assert_not_called(self):
893 """assert that the mock was never called.
894 """
895 if self.call_count != 0:
896 msg = ("Expected '%s' to not have been called. Called %s times.%s"
897 % (self._mock_name or 'mock',
898 self.call_count,
899 self._calls_repr()))
900 raise AssertionError(msg)
901
902 def assert_called(self):
903 """assert that the mock was called at least once
904 """
905 if self.call_count == 0:
906 msg = ("Expected '%s' to have been called." %
907 (self._mock_name or 'mock'))
908 raise AssertionError(msg)
909
910 def assert_called_once(self):
911 """assert that the mock was called only once.
912 """
913 if not self.call_count == 1:
914 msg = ("Expected '%s' to have been called once. Called %s times.%s"
915 % (self._mock_name or 'mock',
916 self.call_count,
917 self._calls_repr()))
918 raise AssertionError(msg)
919
920 def assert_called_with(self, /, *args, **kwargs):
921 """assert that the last call was made with the specified arguments.
922
923 Raises an AssertionError if the args and keyword args passed in are
924 different to the last call to the mock."""
925 if self.call_args is None:
926 expected = self._format_mock_call_signature(args, kwargs)
927 actual = 'not called.'
928 error_message = ('expected call not found.\nExpected: %s\n Actual: %s'
929 % (expected, actual))
930 raise AssertionError(error_message)
931
932 def _error_message():
933 msg = self._format_mock_failure_message(args, kwargs)
934 return msg
935 expected = self._call_matcher(_Call((args, kwargs), two=True))
936 actual = self._call_matcher(self.call_args)
937 if actual != expected:
938 cause = expected if isinstance(expected, Exception) else None
939 raise AssertionError(_error_message()) from cause
940
941
942 def assert_called_once_with(self, /, *args, **kwargs):
943 """assert that the mock was called exactly once and that that call was
944 with the specified arguments."""
945 if not self.call_count == 1:
946 msg = ("Expected '%s' to be called once. Called %s times.%s"
947 % (self._mock_name or 'mock',
948 self.call_count,
949 self._calls_repr()))
950 raise AssertionError(msg)
951 return self.assert_called_with(*args, **kwargs)
952
953
954 def assert_has_calls(self, calls, any_order=False):
955 """assert the mock has been called with the specified calls.
956 The `mock_calls` list is checked for the calls.
957
958 If `any_order` is False (the default) then the calls must be
959 sequential. There can be extra calls before or after the
960 specified calls.
961
962 If `any_order` is True then the calls can be in any order, but
963 they must all appear in `mock_calls`."""
964 expected = [self._call_matcher(c) for c in calls]
965 cause = next((e for e in expected if isinstance(e, Exception)), None)
966 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
967 if not any_order:
968 if expected not in all_calls:
969 if cause is None:
970 problem = 'Calls not found.'
971 else:
972 problem = ('Error processing expected calls.\n'
973 'Errors: {}').format(
974 [e if isinstance(e, Exception) else None
975 for e in expected])
976 raise AssertionError(
977 f'{problem}\n'
978 f'Expected: {_CallList(calls)}'
979 f'{self._calls_repr(prefix=" Actual").rstrip(".")}'
980 ) from cause
981 return
982
983 all_calls = list(all_calls)
984
985 not_found = []
986 for kall in expected:
987 try:
988 all_calls.remove(kall)
989 except ValueError:
990 not_found.append(kall)
991 if not_found:
992 raise AssertionError(
993 '%r does not contain all of %r in its call list, '
994 'found %r instead' % (self._mock_name or 'mock',
995 tuple(not_found), all_calls)
996 ) from cause
997
998
999 def assert_any_call(self, /, *args, **kwargs):
1000 """assert the mock has been called with the specified arguments.
1001
1002 The assert passes if the mock has *ever* been called, unlike
1003 `assert_called_with` and `assert_called_once_with` that only pass if
1004 the call is the most recent one."""
1005 expected = self._call_matcher(_Call((args, kwargs), two=True))
1006 cause = expected if isinstance(expected, Exception) else None
1007 actual = [self._call_matcher(c) for c in self.call_args_list]
1008 if cause or expected not in _AnyComparer(actual):
1009 expected_string = self._format_mock_call_signature(args, kwargs)
1010 raise AssertionError(
1011 '%s call not found' % expected_string
1012 ) from cause
1013
1014
1015 def _get_child_mock(self, /, **kw):
1016 """Create the child mocks for attributes and return value.
1017 By default child mocks will be the same type as the parent.
1018 Subclasses of Mock may want to override this to customize the way
1019 child mocks are made.
1020
1021 For non-callable mocks the callable variant will be used (rather than
1022 any custom subclass)."""
1023 if self._mock_sealed:
1024 attribute = f".{kw['name']}" if "name" in kw else "()"
1025 mock_name = self._extract_mock_name() + attribute
1026 raise AttributeError(mock_name)
1027
1028 _new_name = kw.get("_new_name")
1029 if _new_name in self.__dict__['_spec_asyncs']:
1030 return AsyncMock(**kw)
1031
1032 _type = type(self)
1033 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1034 # Any asynchronous magic becomes an AsyncMock
1035 klass = AsyncMock
1036 elif issubclass(_type, AsyncMockMixin):
1037 if (_new_name in _all_sync_magics or
1038 self._mock_methods and _new_name in self._mock_methods):
1039 # Any synchronous method on AsyncMock becomes a MagicMock
1040 klass = MagicMock
1041 else:
1042 klass = AsyncMock
1043 elif not issubclass(_type, CallableMixin):
1044 if issubclass(_type, NonCallableMagicMock):
1045 klass = MagicMock
1046 elif issubclass(_type, NonCallableMock):
1047 klass = Mock
1048 else:
1049 klass = _type.__mro__[1]
1050 return klass(**kw)
1051
1052
1053 def _calls_repr(self, prefix="Calls"):
1054 """Renders self.mock_calls as a string.
1055
1056 Example: "\nCalls: [call(1), call(2)]."
1057
1058 If self.mock_calls is empty, an empty string is returned. The
1059 output will be truncated if very long.
1060 """
1061 if not self.mock_calls:
1062 return ""
1063 return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1064
1065
1066 _MOCK_SIG = inspect.signature(NonCallableMock.__init__)
1067
1068
1069 class ESC[4;38;5;81m_AnyComparer(ESC[4;38;5;149mlist):
1070 """A list which checks if it contains a call which may have an
1071 argument of ANY, flipping the components of item and self from
1072 their traditional locations so that ANY is guaranteed to be on
1073 the left."""
1074 def __contains__(self, item):
1075 for _call in self:
1076 assert len(item) == len(_call)
1077 if all([
1078 expected == actual
1079 for expected, actual in zip(item, _call)
1080 ]):
1081 return True
1082 return False
1083
1084
1085 def _try_iter(obj):
1086 if obj is None:
1087 return obj
1088 if _is_exception(obj):
1089 return obj
1090 if _callable(obj):
1091 return obj
1092 try:
1093 return iter(obj)
1094 except TypeError:
1095 # XXXX backwards compatibility
1096 # but this will blow up on first call - so maybe we should fail early?
1097 return obj
1098
1099
1100 class ESC[4;38;5;81mCallableMixin(ESC[4;38;5;149mBase):
1101
1102 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1103 wraps=None, name=None, spec_set=None, parent=None,
1104 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1105 self.__dict__['_mock_return_value'] = return_value
1106 _safe_super(CallableMixin, self).__init__(
1107 spec, wraps, name, spec_set, parent,
1108 _spec_state, _new_name, _new_parent, **kwargs
1109 )
1110
1111 self.side_effect = side_effect
1112
1113
1114 def _mock_check_sig(self, /, *args, **kwargs):
1115 # stub method that can be replaced with one with a specific signature
1116 pass
1117
1118
1119 def __call__(self, /, *args, **kwargs):
1120 # can't use self in-case a function / method we are mocking uses self
1121 # in the signature
1122 self._mock_check_sig(*args, **kwargs)
1123 self._increment_mock_call(*args, **kwargs)
1124 return self._mock_call(*args, **kwargs)
1125
1126
1127 def _mock_call(self, /, *args, **kwargs):
1128 return self._execute_mock_call(*args, **kwargs)
1129
1130 def _increment_mock_call(self, /, *args, **kwargs):
1131 self.called = True
1132 self.call_count += 1
1133
1134 # handle call_args
1135 # needs to be set here so assertions on call arguments pass before
1136 # execution in the case of awaited calls
1137 _call = _Call((args, kwargs), two=True)
1138 self.call_args = _call
1139 self.call_args_list.append(_call)
1140
1141 # initial stuff for method_calls:
1142 do_method_calls = self._mock_parent is not None
1143 method_call_name = self._mock_name
1144
1145 # initial stuff for mock_calls:
1146 mock_call_name = self._mock_new_name
1147 is_a_call = mock_call_name == '()'
1148 self.mock_calls.append(_Call(('', args, kwargs)))
1149
1150 # follow up the chain of mocks:
1151 _new_parent = self._mock_new_parent
1152 while _new_parent is not None:
1153
1154 # handle method_calls:
1155 if do_method_calls:
1156 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1157 do_method_calls = _new_parent._mock_parent is not None
1158 if do_method_calls:
1159 method_call_name = _new_parent._mock_name + '.' + method_call_name
1160
1161 # handle mock_calls:
1162 this_mock_call = _Call((mock_call_name, args, kwargs))
1163 _new_parent.mock_calls.append(this_mock_call)
1164
1165 if _new_parent._mock_new_name:
1166 if is_a_call:
1167 dot = ''
1168 else:
1169 dot = '.'
1170 is_a_call = _new_parent._mock_new_name == '()'
1171 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1172
1173 # follow the parental chain:
1174 _new_parent = _new_parent._mock_new_parent
1175
1176 def _execute_mock_call(self, /, *args, **kwargs):
1177 # separate from _increment_mock_call so that awaited functions are
1178 # executed separately from their call, also AsyncMock overrides this method
1179
1180 effect = self.side_effect
1181 if effect is not None:
1182 if _is_exception(effect):
1183 raise effect
1184 elif not _callable(effect):
1185 result = next(effect)
1186 if _is_exception(result):
1187 raise result
1188 else:
1189 result = effect(*args, **kwargs)
1190
1191 if result is not DEFAULT:
1192 return result
1193
1194 if self._mock_return_value is not DEFAULT:
1195 return self.return_value
1196
1197 if self._mock_wraps is not None:
1198 return self._mock_wraps(*args, **kwargs)
1199
1200 return self.return_value
1201
1202
1203
1204 class ESC[4;38;5;81mMock(ESC[4;38;5;149mCallableMixin, ESC[4;38;5;149mNonCallableMock):
1205 """
1206 Create a new `Mock` object. `Mock` takes several optional arguments
1207 that specify the behaviour of the Mock object:
1208
1209 * `spec`: This can be either a list of strings or an existing object (a
1210 class or instance) that acts as the specification for the mock object. If
1211 you pass in an object then a list of strings is formed by calling dir on
1212 the object (excluding unsupported magic attributes and methods). Accessing
1213 any attribute not in this list will raise an `AttributeError`.
1214
1215 If `spec` is an object (rather than a list of strings) then
1216 `mock.__class__` returns the class of the spec object. This allows mocks
1217 to pass `isinstance` tests.
1218
1219 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1220 or get an attribute on the mock that isn't on the object passed as
1221 `spec_set` will raise an `AttributeError`.
1222
1223 * `side_effect`: A function to be called whenever the Mock is called. See
1224 the `side_effect` attribute. Useful for raising exceptions or
1225 dynamically changing return values. The function is called with the same
1226 arguments as the mock, and unless it returns `DEFAULT`, the return
1227 value of this function is used as the return value.
1228
1229 If `side_effect` is an iterable then each call to the mock will return
1230 the next value from the iterable. If any of the members of the iterable
1231 are exceptions they will be raised instead of returned.
1232
1233 * `return_value`: The value returned when the mock is called. By default
1234 this is a new Mock (created on first access). See the
1235 `return_value` attribute.
1236
1237 * `unsafe`: By default, accessing any attribute whose name starts with
1238 *assert*, *assret*, *asert*, *aseert* or *assrt* will raise an
1239 AttributeError. Passing `unsafe=True` will allow access to
1240 these attributes.
1241
1242 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1243 calling the Mock will pass the call through to the wrapped object
1244 (returning the real result). Attribute access on the mock will return a
1245 Mock object that wraps the corresponding attribute of the wrapped object
1246 (so attempting to access an attribute that doesn't exist will raise an
1247 `AttributeError`).
1248
1249 If the mock has an explicit `return_value` set then calls are not passed
1250 to the wrapped object and the `return_value` is returned instead.
1251
1252 * `name`: If the mock has a name then it will be used in the repr of the
1253 mock. This can be useful for debugging. The name is propagated to child
1254 mocks.
1255
1256 Mocks can also be called with arbitrary keyword arguments. These will be
1257 used to set attributes on the mock after it is created.
1258 """
1259
1260
1261 # _check_spec_arg_typos takes kwargs from commands like patch and checks that
1262 # they don't contain common misspellings of arguments related to autospeccing.
1263 def _check_spec_arg_typos(kwargs_to_check):
1264 typos = ("autospect", "auto_spec", "set_spec")
1265 for typo in typos:
1266 if typo in kwargs_to_check:
1267 raise RuntimeError(
1268 f"{typo!r} might be a typo; use unsafe=True if this is intended"
1269 )
1270
1271
1272 class ESC[4;38;5;81m_patch(ESC[4;38;5;149mobject):
1273
1274 attribute_name = None
1275 _active_patches = []
1276
1277 def __init__(
1278 self, getter, attribute, new, spec, create,
1279 spec_set, autospec, new_callable, kwargs, *, unsafe=False
1280 ):
1281 if new_callable is not None:
1282 if new is not DEFAULT:
1283 raise ValueError(
1284 "Cannot use 'new' and 'new_callable' together"
1285 )
1286 if autospec is not None:
1287 raise ValueError(
1288 "Cannot use 'autospec' and 'new_callable' together"
1289 )
1290 if not unsafe:
1291 _check_spec_arg_typos(kwargs)
1292 if _is_instance_mock(spec):
1293 raise InvalidSpecError(
1294 f'Cannot spec attr {attribute!r} as the spec '
1295 f'has already been mocked out. [spec={spec!r}]')
1296 if _is_instance_mock(spec_set):
1297 raise InvalidSpecError(
1298 f'Cannot spec attr {attribute!r} as the spec_set '
1299 f'target has already been mocked out. [spec_set={spec_set!r}]')
1300
1301 self.getter = getter
1302 self.attribute = attribute
1303 self.new = new
1304 self.new_callable = new_callable
1305 self.spec = spec
1306 self.create = create
1307 self.has_local = False
1308 self.spec_set = spec_set
1309 self.autospec = autospec
1310 self.kwargs = kwargs
1311 self.additional_patchers = []
1312
1313
1314 def copy(self):
1315 patcher = _patch(
1316 self.getter, self.attribute, self.new, self.spec,
1317 self.create, self.spec_set,
1318 self.autospec, self.new_callable, self.kwargs
1319 )
1320 patcher.attribute_name = self.attribute_name
1321 patcher.additional_patchers = [
1322 p.copy() for p in self.additional_patchers
1323 ]
1324 return patcher
1325
1326
1327 def __call__(self, func):
1328 if isinstance(func, type):
1329 return self.decorate_class(func)
1330 if inspect.iscoroutinefunction(func):
1331 return self.decorate_async_callable(func)
1332 return self.decorate_callable(func)
1333
1334
1335 def decorate_class(self, klass):
1336 for attr in dir(klass):
1337 if not attr.startswith(patch.TEST_PREFIX):
1338 continue
1339
1340 attr_value = getattr(klass, attr)
1341 if not hasattr(attr_value, "__call__"):
1342 continue
1343
1344 patcher = self.copy()
1345 setattr(klass, attr, patcher(attr_value))
1346 return klass
1347
1348
1349 @contextlib.contextmanager
1350 def decoration_helper(self, patched, args, keywargs):
1351 extra_args = []
1352 with contextlib.ExitStack() as exit_stack:
1353 for patching in patched.patchings:
1354 arg = exit_stack.enter_context(patching)
1355 if patching.attribute_name is not None:
1356 keywargs.update(arg)
1357 elif patching.new is DEFAULT:
1358 extra_args.append(arg)
1359
1360 args += tuple(extra_args)
1361 yield (args, keywargs)
1362
1363
1364 def decorate_callable(self, func):
1365 # NB. Keep the method in sync with decorate_async_callable()
1366 if hasattr(func, 'patchings'):
1367 func.patchings.append(self)
1368 return func
1369
1370 @wraps(func)
1371 def patched(*args, **keywargs):
1372 with self.decoration_helper(patched,
1373 args,
1374 keywargs) as (newargs, newkeywargs):
1375 return func(*newargs, **newkeywargs)
1376
1377 patched.patchings = [self]
1378 return patched
1379
1380
1381 def decorate_async_callable(self, func):
1382 # NB. Keep the method in sync with decorate_callable()
1383 if hasattr(func, 'patchings'):
1384 func.patchings.append(self)
1385 return func
1386
1387 @wraps(func)
1388 async def patched(*args, **keywargs):
1389 with self.decoration_helper(patched,
1390 args,
1391 keywargs) as (newargs, newkeywargs):
1392 return await func(*newargs, **newkeywargs)
1393
1394 patched.patchings = [self]
1395 return patched
1396
1397
1398 def get_original(self):
1399 target = self.getter()
1400 name = self.attribute
1401
1402 original = DEFAULT
1403 local = False
1404
1405 try:
1406 original = target.__dict__[name]
1407 except (AttributeError, KeyError):
1408 original = getattr(target, name, DEFAULT)
1409 else:
1410 local = True
1411
1412 if name in _builtins and isinstance(target, ModuleType):
1413 self.create = True
1414
1415 if not self.create and original is DEFAULT:
1416 raise AttributeError(
1417 "%s does not have the attribute %r" % (target, name)
1418 )
1419 return original, local
1420
1421
1422 def __enter__(self):
1423 """Perform the patch."""
1424 new, spec, spec_set = self.new, self.spec, self.spec_set
1425 autospec, kwargs = self.autospec, self.kwargs
1426 new_callable = self.new_callable
1427 self.target = self.getter()
1428
1429 # normalise False to None
1430 if spec is False:
1431 spec = None
1432 if spec_set is False:
1433 spec_set = None
1434 if autospec is False:
1435 autospec = None
1436
1437 if spec is not None and autospec is not None:
1438 raise TypeError("Can't specify spec and autospec")
1439 if ((spec is not None or autospec is not None) and
1440 spec_set not in (True, None)):
1441 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1442
1443 original, local = self.get_original()
1444
1445 if new is DEFAULT and autospec is None:
1446 inherit = False
1447 if spec is True:
1448 # set spec to the object we are replacing
1449 spec = original
1450 if spec_set is True:
1451 spec_set = original
1452 spec = None
1453 elif spec is not None:
1454 if spec_set is True:
1455 spec_set = spec
1456 spec = None
1457 elif spec_set is True:
1458 spec_set = original
1459
1460 if spec is not None or spec_set is not None:
1461 if original is DEFAULT:
1462 raise TypeError("Can't use 'spec' with create=True")
1463 if isinstance(original, type):
1464 # If we're patching out a class and there is a spec
1465 inherit = True
1466 if spec is None and _is_async_obj(original):
1467 Klass = AsyncMock
1468 else:
1469 Klass = MagicMock
1470 _kwargs = {}
1471 if new_callable is not None:
1472 Klass = new_callable
1473 elif spec is not None or spec_set is not None:
1474 this_spec = spec
1475 if spec_set is not None:
1476 this_spec = spec_set
1477 if _is_list(this_spec):
1478 not_callable = '__call__' not in this_spec
1479 else:
1480 not_callable = not callable(this_spec)
1481 if _is_async_obj(this_spec):
1482 Klass = AsyncMock
1483 elif not_callable:
1484 Klass = NonCallableMagicMock
1485
1486 if spec is not None:
1487 _kwargs['spec'] = spec
1488 if spec_set is not None:
1489 _kwargs['spec_set'] = spec_set
1490
1491 # add a name to mocks
1492 if (isinstance(Klass, type) and
1493 issubclass(Klass, NonCallableMock) and self.attribute):
1494 _kwargs['name'] = self.attribute
1495
1496 _kwargs.update(kwargs)
1497 new = Klass(**_kwargs)
1498
1499 if inherit and _is_instance_mock(new):
1500 # we can only tell if the instance should be callable if the
1501 # spec is not a list
1502 this_spec = spec
1503 if spec_set is not None:
1504 this_spec = spec_set
1505 if (not _is_list(this_spec) and not
1506 _instance_callable(this_spec)):
1507 Klass = NonCallableMagicMock
1508
1509 _kwargs.pop('name')
1510 new.return_value = Klass(_new_parent=new, _new_name='()',
1511 **_kwargs)
1512 elif autospec is not None:
1513 # spec is ignored, new *must* be default, spec_set is treated
1514 # as a boolean. Should we check spec is not None and that spec_set
1515 # is a bool?
1516 if new is not DEFAULT:
1517 raise TypeError(
1518 "autospec creates the mock for you. Can't specify "
1519 "autospec and new."
1520 )
1521 if original is DEFAULT:
1522 raise TypeError("Can't use 'autospec' with create=True")
1523 spec_set = bool(spec_set)
1524 if autospec is True:
1525 autospec = original
1526
1527 if _is_instance_mock(self.target):
1528 raise InvalidSpecError(
1529 f'Cannot autospec attr {self.attribute!r} as the patch '
1530 f'target has already been mocked out. '
1531 f'[target={self.target!r}, attr={autospec!r}]')
1532 if _is_instance_mock(autospec):
1533 target_name = getattr(self.target, '__name__', self.target)
1534 raise InvalidSpecError(
1535 f'Cannot autospec attr {self.attribute!r} from target '
1536 f'{target_name!r} as it has already been mocked out. '
1537 f'[target={self.target!r}, attr={autospec!r}]')
1538
1539 new = create_autospec(autospec, spec_set=spec_set,
1540 _name=self.attribute, **kwargs)
1541 elif kwargs:
1542 # can't set keyword args when we aren't creating the mock
1543 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1544 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1545
1546 new_attr = new
1547
1548 self.temp_original = original
1549 self.is_local = local
1550 self._exit_stack = contextlib.ExitStack()
1551 try:
1552 setattr(self.target, self.attribute, new_attr)
1553 if self.attribute_name is not None:
1554 extra_args = {}
1555 if self.new is DEFAULT:
1556 extra_args[self.attribute_name] = new
1557 for patching in self.additional_patchers:
1558 arg = self._exit_stack.enter_context(patching)
1559 if patching.new is DEFAULT:
1560 extra_args.update(arg)
1561 return extra_args
1562
1563 return new
1564 except:
1565 if not self.__exit__(*sys.exc_info()):
1566 raise
1567
1568 def __exit__(self, *exc_info):
1569 """Undo the patch."""
1570 if self.is_local and self.temp_original is not DEFAULT:
1571 setattr(self.target, self.attribute, self.temp_original)
1572 else:
1573 delattr(self.target, self.attribute)
1574 if not self.create and (not hasattr(self.target, self.attribute) or
1575 self.attribute in ('__doc__', '__module__',
1576 '__defaults__', '__annotations__',
1577 '__kwdefaults__')):
1578 # needed for proxy objects like django settings
1579 setattr(self.target, self.attribute, self.temp_original)
1580
1581 del self.temp_original
1582 del self.is_local
1583 del self.target
1584 exit_stack = self._exit_stack
1585 del self._exit_stack
1586 return exit_stack.__exit__(*exc_info)
1587
1588
1589 def start(self):
1590 """Activate a patch, returning any created mock."""
1591 result = self.__enter__()
1592 self._active_patches.append(self)
1593 return result
1594
1595
1596 def stop(self):
1597 """Stop an active patch."""
1598 try:
1599 self._active_patches.remove(self)
1600 except ValueError:
1601 # If the patch hasn't been started this will fail
1602 return None
1603
1604 return self.__exit__(None, None, None)
1605
1606
1607
1608 def _get_target(target):
1609 try:
1610 target, attribute = target.rsplit('.', 1)
1611 except (TypeError, ValueError, AttributeError):
1612 raise TypeError(
1613 f"Need a valid target to patch. You supplied: {target!r}")
1614 return partial(pkgutil.resolve_name, target), attribute
1615
1616
1617 def _patch_object(
1618 target, attribute, new=DEFAULT, spec=None,
1619 create=False, spec_set=None, autospec=None,
1620 new_callable=None, *, unsafe=False, **kwargs
1621 ):
1622 """
1623 patch the named member (`attribute`) on an object (`target`) with a mock
1624 object.
1625
1626 `patch.object` can be used as a decorator, class decorator or a context
1627 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1628 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1629 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1630 the mock object it creates.
1631
1632 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1633 for choosing which methods to wrap.
1634 """
1635 if type(target) is str:
1636 raise TypeError(
1637 f"{target!r} must be the actual object to be patched, not a str"
1638 )
1639 getter = lambda: target
1640 return _patch(
1641 getter, attribute, new, spec, create,
1642 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1643 )
1644
1645
1646 def _patch_multiple(target, spec=None, create=False, spec_set=None,
1647 autospec=None, new_callable=None, **kwargs):
1648 """Perform multiple patches in a single call. It takes the object to be
1649 patched (either as an object or a string to fetch the object by importing)
1650 and keyword arguments for the patches::
1651
1652 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1653 ...
1654
1655 Use `DEFAULT` as the value if you want `patch.multiple` to create
1656 mocks for you. In this case the created mocks are passed into a decorated
1657 function by keyword, and a dictionary is returned when `patch.multiple` is
1658 used as a context manager.
1659
1660 `patch.multiple` can be used as a decorator, class decorator or a context
1661 manager. The arguments `spec`, `spec_set`, `create`,
1662 `autospec` and `new_callable` have the same meaning as for `patch`. These
1663 arguments will be applied to *all* patches done by `patch.multiple`.
1664
1665 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1666 for choosing which methods to wrap.
1667 """
1668 if type(target) is str:
1669 getter = partial(pkgutil.resolve_name, target)
1670 else:
1671 getter = lambda: target
1672
1673 if not kwargs:
1674 raise ValueError(
1675 'Must supply at least one keyword argument with patch.multiple'
1676 )
1677 # need to wrap in a list for python 3, where items is a view
1678 items = list(kwargs.items())
1679 attribute, new = items[0]
1680 patcher = _patch(
1681 getter, attribute, new, spec, create, spec_set,
1682 autospec, new_callable, {}
1683 )
1684 patcher.attribute_name = attribute
1685 for attribute, new in items[1:]:
1686 this_patcher = _patch(
1687 getter, attribute, new, spec, create, spec_set,
1688 autospec, new_callable, {}
1689 )
1690 this_patcher.attribute_name = attribute
1691 patcher.additional_patchers.append(this_patcher)
1692 return patcher
1693
1694
1695 def patch(
1696 target, new=DEFAULT, spec=None, create=False,
1697 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1698 ):
1699 """
1700 `patch` acts as a function decorator, class decorator or a context
1701 manager. Inside the body of the function or with statement, the `target`
1702 is patched with a `new` object. When the function/with statement exits
1703 the patch is undone.
1704
1705 If `new` is omitted, then the target is replaced with an
1706 `AsyncMock if the patched object is an async function or a
1707 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1708 omitted, the created mock is passed in as an extra argument to the
1709 decorated function. If `patch` is used as a context manager the created
1710 mock is returned by the context manager.
1711
1712 `target` should be a string in the form `'package.module.ClassName'`. The
1713 `target` is imported and the specified object replaced with the `new`
1714 object, so the `target` must be importable from the environment you are
1715 calling `patch` from. The target is imported when the decorated function
1716 is executed, not at decoration time.
1717
1718 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1719 if patch is creating one for you.
1720
1721 In addition you can pass `spec=True` or `spec_set=True`, which causes
1722 patch to pass in the object being mocked as the spec/spec_set object.
1723
1724 `new_callable` allows you to specify a different class, or callable object,
1725 that will be called to create the `new` object. By default `AsyncMock` is
1726 used for async functions and `MagicMock` for the rest.
1727
1728 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1729 then the mock will be created with a spec from the object being replaced.
1730 All attributes of the mock will also have the spec of the corresponding
1731 attribute of the object being replaced. Methods and functions being
1732 mocked will have their arguments checked and will raise a `TypeError` if
1733 they are called with the wrong signature. For mocks replacing a class,
1734 their return value (the 'instance') will have the same spec as the class.
1735
1736 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1737 arbitrary object as the spec instead of the one being replaced.
1738
1739 By default `patch` will fail to replace attributes that don't exist. If
1740 you pass in `create=True`, and the attribute doesn't exist, patch will
1741 create the attribute for you when the patched function is called, and
1742 delete it again afterwards. This is useful for writing tests against
1743 attributes that your production code creates at runtime. It is off by
1744 default because it can be dangerous. With it switched on you can write
1745 passing tests against APIs that don't actually exist!
1746
1747 Patch can be used as a `TestCase` class decorator. It works by
1748 decorating each test method in the class. This reduces the boilerplate
1749 code when your test methods share a common patchings set. `patch` finds
1750 tests by looking for method names that start with `patch.TEST_PREFIX`.
1751 By default this is `test`, which matches the way `unittest` finds tests.
1752 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1753
1754 Patch can be used as a context manager, with the with statement. Here the
1755 patching applies to the indented block after the with statement. If you
1756 use "as" then the patched object will be bound to the name after the
1757 "as"; very useful if `patch` is creating a mock object for you.
1758
1759 Patch will raise a `RuntimeError` if passed some common misspellings of
1760 the arguments autospec and spec_set. Pass the argument `unsafe` with the
1761 value True to disable that check.
1762
1763 `patch` takes arbitrary keyword arguments. These will be passed to
1764 `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1765 otherwise or to `new_callable` if specified.
1766
1767 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1768 available for alternate use-cases.
1769 """
1770 getter, attribute = _get_target(target)
1771 return _patch(
1772 getter, attribute, new, spec, create,
1773 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1774 )
1775
1776
1777 class ESC[4;38;5;81m_patch_dict(ESC[4;38;5;149mobject):
1778 """
1779 Patch a dictionary, or dictionary like object, and restore the dictionary
1780 to its original state after the test.
1781
1782 `in_dict` can be a dictionary or a mapping like container. If it is a
1783 mapping then it must at least support getting, setting and deleting items
1784 plus iterating over keys.
1785
1786 `in_dict` can also be a string specifying the name of the dictionary, which
1787 will then be fetched by importing it.
1788
1789 `values` can be a dictionary of values to set in the dictionary. `values`
1790 can also be an iterable of `(key, value)` pairs.
1791
1792 If `clear` is True then the dictionary will be cleared before the new
1793 values are set.
1794
1795 `patch.dict` can also be called with arbitrary keyword arguments to set
1796 values in the dictionary::
1797
1798 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1799 ...
1800
1801 `patch.dict` can be used as a context manager, decorator or class
1802 decorator. When used as a class decorator `patch.dict` honours
1803 `patch.TEST_PREFIX` for choosing which methods to wrap.
1804 """
1805
1806 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1807 self.in_dict = in_dict
1808 # support any argument supported by dict(...) constructor
1809 self.values = dict(values)
1810 self.values.update(kwargs)
1811 self.clear = clear
1812 self._original = None
1813
1814
1815 def __call__(self, f):
1816 if isinstance(f, type):
1817 return self.decorate_class(f)
1818 if inspect.iscoroutinefunction(f):
1819 return self.decorate_async_callable(f)
1820 return self.decorate_callable(f)
1821
1822
1823 def decorate_callable(self, f):
1824 @wraps(f)
1825 def _inner(*args, **kw):
1826 self._patch_dict()
1827 try:
1828 return f(*args, **kw)
1829 finally:
1830 self._unpatch_dict()
1831
1832 return _inner
1833
1834
1835 def decorate_async_callable(self, f):
1836 @wraps(f)
1837 async def _inner(*args, **kw):
1838 self._patch_dict()
1839 try:
1840 return await f(*args, **kw)
1841 finally:
1842 self._unpatch_dict()
1843
1844 return _inner
1845
1846
1847 def decorate_class(self, klass):
1848 for attr in dir(klass):
1849 attr_value = getattr(klass, attr)
1850 if (attr.startswith(patch.TEST_PREFIX) and
1851 hasattr(attr_value, "__call__")):
1852 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1853 decorated = decorator(attr_value)
1854 setattr(klass, attr, decorated)
1855 return klass
1856
1857
1858 def __enter__(self):
1859 """Patch the dict."""
1860 self._patch_dict()
1861 return self.in_dict
1862
1863
1864 def _patch_dict(self):
1865 values = self.values
1866 if isinstance(self.in_dict, str):
1867 self.in_dict = pkgutil.resolve_name(self.in_dict)
1868 in_dict = self.in_dict
1869 clear = self.clear
1870
1871 try:
1872 original = in_dict.copy()
1873 except AttributeError:
1874 # dict like object with no copy method
1875 # must support iteration over keys
1876 original = {}
1877 for key in in_dict:
1878 original[key] = in_dict[key]
1879 self._original = original
1880
1881 if clear:
1882 _clear_dict(in_dict)
1883
1884 try:
1885 in_dict.update(values)
1886 except AttributeError:
1887 # dict like object with no update method
1888 for key in values:
1889 in_dict[key] = values[key]
1890
1891
1892 def _unpatch_dict(self):
1893 in_dict = self.in_dict
1894 original = self._original
1895
1896 _clear_dict(in_dict)
1897
1898 try:
1899 in_dict.update(original)
1900 except AttributeError:
1901 for key in original:
1902 in_dict[key] = original[key]
1903
1904
1905 def __exit__(self, *args):
1906 """Unpatch the dict."""
1907 if self._original is not None:
1908 self._unpatch_dict()
1909 return False
1910
1911
1912 def start(self):
1913 """Activate a patch, returning any created mock."""
1914 result = self.__enter__()
1915 _patch._active_patches.append(self)
1916 return result
1917
1918
1919 def stop(self):
1920 """Stop an active patch."""
1921 try:
1922 _patch._active_patches.remove(self)
1923 except ValueError:
1924 # If the patch hasn't been started this will fail
1925 return None
1926
1927 return self.__exit__(None, None, None)
1928
1929
1930 def _clear_dict(in_dict):
1931 try:
1932 in_dict.clear()
1933 except AttributeError:
1934 keys = list(in_dict)
1935 for key in keys:
1936 del in_dict[key]
1937
1938
1939 def _patch_stopall():
1940 """Stop all active patches. LIFO to unroll nested patches."""
1941 for patch in reversed(_patch._active_patches):
1942 patch.stop()
1943
1944
1945 patch.object = _patch_object
1946 patch.dict = _patch_dict
1947 patch.multiple = _patch_multiple
1948 patch.stopall = _patch_stopall
1949 patch.TEST_PREFIX = 'test'
1950
1951 magic_methods = (
1952 "lt le gt ge eq ne "
1953 "getitem setitem delitem "
1954 "len contains iter "
1955 "hash str sizeof "
1956 "enter exit "
1957 # we added divmod and rdivmod here instead of numerics
1958 # because there is no idivmod
1959 "divmod rdivmod neg pos abs invert "
1960 "complex int float index "
1961 "round trunc floor ceil "
1962 "bool next "
1963 "fspath "
1964 "aiter "
1965 )
1966
1967 numerics = (
1968 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow"
1969 )
1970 inplace = ' '.join('i%s' % n for n in numerics.split())
1971 right = ' '.join('r%s' % n for n in numerics.split())
1972
1973 # not including __prepare__, __instancecheck__, __subclasscheck__
1974 # (as they are metaclass methods)
1975 # __del__ is not supported at all as it causes problems if it exists
1976
1977 _non_defaults = {
1978 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
1979 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
1980 '__getstate__', '__setstate__', '__getformat__',
1981 '__repr__', '__dir__', '__subclasses__', '__format__',
1982 '__getnewargs_ex__',
1983 }
1984
1985
1986 def _get_method(name, func):
1987 "Turns a callable object (like a mock) into a real function"
1988 def method(self, /, *args, **kw):
1989 return func(self, *args, **kw)
1990 method.__name__ = name
1991 return method
1992
1993
1994 _magics = {
1995 '__%s__' % method for method in
1996 ' '.join([magic_methods, numerics, inplace, right]).split()
1997 }
1998
1999 # Magic methods used for async `with` statements
2000 _async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
2001 # Magic methods that are only used with async calls but are synchronous functions themselves
2002 _sync_async_magics = {"__aiter__"}
2003 _async_magics = _async_method_magics | _sync_async_magics
2004
2005 _all_sync_magics = _magics | _non_defaults
2006 _all_magics = _all_sync_magics | _async_magics
2007
2008 _unsupported_magics = {
2009 '__getattr__', '__setattr__',
2010 '__init__', '__new__', '__prepare__',
2011 '__instancecheck__', '__subclasscheck__',
2012 '__del__'
2013 }
2014
2015 _calculate_return_value = {
2016 '__hash__': lambda self: object.__hash__(self),
2017 '__str__': lambda self: object.__str__(self),
2018 '__sizeof__': lambda self: object.__sizeof__(self),
2019 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
2020 }
2021
2022 _return_values = {
2023 '__lt__': NotImplemented,
2024 '__gt__': NotImplemented,
2025 '__le__': NotImplemented,
2026 '__ge__': NotImplemented,
2027 '__int__': 1,
2028 '__contains__': False,
2029 '__len__': 0,
2030 '__exit__': False,
2031 '__complex__': 1j,
2032 '__float__': 1.0,
2033 '__bool__': True,
2034 '__index__': 1,
2035 '__aexit__': False,
2036 }
2037
2038
2039 def _get_eq(self):
2040 def __eq__(other):
2041 ret_val = self.__eq__._mock_return_value
2042 if ret_val is not DEFAULT:
2043 return ret_val
2044 if self is other:
2045 return True
2046 return NotImplemented
2047 return __eq__
2048
2049 def _get_ne(self):
2050 def __ne__(other):
2051 if self.__ne__._mock_return_value is not DEFAULT:
2052 return DEFAULT
2053 if self is other:
2054 return False
2055 return NotImplemented
2056 return __ne__
2057
2058 def _get_iter(self):
2059 def __iter__():
2060 ret_val = self.__iter__._mock_return_value
2061 if ret_val is DEFAULT:
2062 return iter([])
2063 # if ret_val was already an iterator, then calling iter on it should
2064 # return the iterator unchanged
2065 return iter(ret_val)
2066 return __iter__
2067
2068 def _get_async_iter(self):
2069 def __aiter__():
2070 ret_val = self.__aiter__._mock_return_value
2071 if ret_val is DEFAULT:
2072 return _AsyncIterator(iter([]))
2073 return _AsyncIterator(iter(ret_val))
2074 return __aiter__
2075
2076 _side_effect_methods = {
2077 '__eq__': _get_eq,
2078 '__ne__': _get_ne,
2079 '__iter__': _get_iter,
2080 '__aiter__': _get_async_iter
2081 }
2082
2083
2084
2085 def _set_return_value(mock, method, name):
2086 fixed = _return_values.get(name, DEFAULT)
2087 if fixed is not DEFAULT:
2088 method.return_value = fixed
2089 return
2090
2091 return_calculator = _calculate_return_value.get(name)
2092 if return_calculator is not None:
2093 return_value = return_calculator(mock)
2094 method.return_value = return_value
2095 return
2096
2097 side_effector = _side_effect_methods.get(name)
2098 if side_effector is not None:
2099 method.side_effect = side_effector(mock)
2100
2101
2102
2103 class ESC[4;38;5;81mMagicMixin(ESC[4;38;5;149mBase):
2104 def __init__(self, /, *args, **kw):
2105 self._mock_set_magics() # make magic work for kwargs in init
2106 _safe_super(MagicMixin, self).__init__(*args, **kw)
2107 self._mock_set_magics() # fix magic broken by upper level init
2108
2109
2110 def _mock_set_magics(self):
2111 orig_magics = _magics | _async_method_magics
2112 these_magics = orig_magics
2113
2114 if getattr(self, "_mock_methods", None) is not None:
2115 these_magics = orig_magics.intersection(self._mock_methods)
2116
2117 remove_magics = set()
2118 remove_magics = orig_magics - these_magics
2119
2120 for entry in remove_magics:
2121 if entry in type(self).__dict__:
2122 # remove unneeded magic methods
2123 delattr(self, entry)
2124
2125 # don't overwrite existing attributes if called a second time
2126 these_magics = these_magics - set(type(self).__dict__)
2127
2128 _type = type(self)
2129 for entry in these_magics:
2130 setattr(_type, entry, MagicProxy(entry, self))
2131
2132
2133
2134 class ESC[4;38;5;81mNonCallableMagicMock(ESC[4;38;5;149mMagicMixin, ESC[4;38;5;149mNonCallableMock):
2135 """A version of `MagicMock` that isn't callable."""
2136 def mock_add_spec(self, spec, spec_set=False):
2137 """Add a spec to a mock. `spec` can either be an object or a
2138 list of strings. Only attributes on the `spec` can be fetched as
2139 attributes from the mock.
2140
2141 If `spec_set` is True then only attributes on the spec can be set."""
2142 self._mock_add_spec(spec, spec_set)
2143 self._mock_set_magics()
2144
2145
2146 class ESC[4;38;5;81mAsyncMagicMixin(ESC[4;38;5;149mMagicMixin):
2147 def __init__(self, /, *args, **kw):
2148 self._mock_set_magics() # make magic work for kwargs in init
2149 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
2150 self._mock_set_magics() # fix magic broken by upper level init
2151
2152 class ESC[4;38;5;81mMagicMock(ESC[4;38;5;149mMagicMixin, ESC[4;38;5;149mMock):
2153 """
2154 MagicMock is a subclass of Mock with default implementations
2155 of most of the magic methods. You can use MagicMock without having to
2156 configure the magic methods yourself.
2157
2158 If you use the `spec` or `spec_set` arguments then *only* magic
2159 methods that exist in the spec will be created.
2160
2161 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2162 """
2163 def mock_add_spec(self, spec, spec_set=False):
2164 """Add a spec to a mock. `spec` can either be an object or a
2165 list of strings. Only attributes on the `spec` can be fetched as
2166 attributes from the mock.
2167
2168 If `spec_set` is True then only attributes on the spec can be set."""
2169 self._mock_add_spec(spec, spec_set)
2170 self._mock_set_magics()
2171
2172
2173
2174 class ESC[4;38;5;81mMagicProxy(ESC[4;38;5;149mBase):
2175 def __init__(self, name, parent):
2176 self.name = name
2177 self.parent = parent
2178
2179 def create_mock(self):
2180 entry = self.name
2181 parent = self.parent
2182 m = parent._get_child_mock(name=entry, _new_name=entry,
2183 _new_parent=parent)
2184 setattr(parent, entry, m)
2185 _set_return_value(parent, m, entry)
2186 return m
2187
2188 def __get__(self, obj, _type=None):
2189 return self.create_mock()
2190
2191
2192 class ESC[4;38;5;81mAsyncMockMixin(ESC[4;38;5;149mBase):
2193 await_count = _delegating_property('await_count')
2194 await_args = _delegating_property('await_args')
2195 await_args_list = _delegating_property('await_args_list')
2196
2197 def __init__(self, /, *args, **kwargs):
2198 super().__init__(*args, **kwargs)
2199 # iscoroutinefunction() checks _is_coroutine property to say if an
2200 # object is a coroutine. Without this check it looks to see if it is a
2201 # function/method, which in this case it is not (since it is an
2202 # AsyncMock).
2203 # It is set through __dict__ because when spec_set is True, this
2204 # attribute is likely undefined.
2205 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2206 self.__dict__['_mock_await_count'] = 0
2207 self.__dict__['_mock_await_args'] = None
2208 self.__dict__['_mock_await_args_list'] = _CallList()
2209 code_mock = NonCallableMock(spec_set=CodeType)
2210 code_mock.co_flags = (
2211 inspect.CO_COROUTINE
2212 + inspect.CO_VARARGS
2213 + inspect.CO_VARKEYWORDS
2214 )
2215 code_mock.co_argcount = 0
2216 code_mock.co_varnames = ('args', 'kwargs')
2217 code_mock.co_posonlyargcount = 0
2218 code_mock.co_kwonlyargcount = 0
2219 self.__dict__['__code__'] = code_mock
2220 self.__dict__['__name__'] = 'AsyncMock'
2221 self.__dict__['__defaults__'] = tuple()
2222 self.__dict__['__kwdefaults__'] = {}
2223 self.__dict__['__annotations__'] = None
2224
2225 async def _execute_mock_call(self, /, *args, **kwargs):
2226 # This is nearly just like super(), except for special handling
2227 # of coroutines
2228
2229 _call = _Call((args, kwargs), two=True)
2230 self.await_count += 1
2231 self.await_args = _call
2232 self.await_args_list.append(_call)
2233
2234 effect = self.side_effect
2235 if effect is not None:
2236 if _is_exception(effect):
2237 raise effect
2238 elif not _callable(effect):
2239 try:
2240 result = next(effect)
2241 except StopIteration:
2242 # It is impossible to propagate a StopIteration
2243 # through coroutines because of PEP 479
2244 raise StopAsyncIteration
2245 if _is_exception(result):
2246 raise result
2247 elif iscoroutinefunction(effect):
2248 result = await effect(*args, **kwargs)
2249 else:
2250 result = effect(*args, **kwargs)
2251
2252 if result is not DEFAULT:
2253 return result
2254
2255 if self._mock_return_value is not DEFAULT:
2256 return self.return_value
2257
2258 if self._mock_wraps is not None:
2259 if iscoroutinefunction(self._mock_wraps):
2260 return await self._mock_wraps(*args, **kwargs)
2261 return self._mock_wraps(*args, **kwargs)
2262
2263 return self.return_value
2264
2265 def assert_awaited(self):
2266 """
2267 Assert that the mock was awaited at least once.
2268 """
2269 if self.await_count == 0:
2270 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2271 raise AssertionError(msg)
2272
2273 def assert_awaited_once(self):
2274 """
2275 Assert that the mock was awaited exactly once.
2276 """
2277 if not self.await_count == 1:
2278 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2279 f" Awaited {self.await_count} times.")
2280 raise AssertionError(msg)
2281
2282 def assert_awaited_with(self, /, *args, **kwargs):
2283 """
2284 Assert that the last await was with the specified arguments.
2285 """
2286 if self.await_args is None:
2287 expected = self._format_mock_call_signature(args, kwargs)
2288 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2289
2290 def _error_message():
2291 msg = self._format_mock_failure_message(args, kwargs, action='await')
2292 return msg
2293
2294 expected = self._call_matcher(_Call((args, kwargs), two=True))
2295 actual = self._call_matcher(self.await_args)
2296 if actual != expected:
2297 cause = expected if isinstance(expected, Exception) else None
2298 raise AssertionError(_error_message()) from cause
2299
2300 def assert_awaited_once_with(self, /, *args, **kwargs):
2301 """
2302 Assert that the mock was awaited exactly once and with the specified
2303 arguments.
2304 """
2305 if not self.await_count == 1:
2306 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2307 f" Awaited {self.await_count} times.")
2308 raise AssertionError(msg)
2309 return self.assert_awaited_with(*args, **kwargs)
2310
2311 def assert_any_await(self, /, *args, **kwargs):
2312 """
2313 Assert the mock has ever been awaited with the specified arguments.
2314 """
2315 expected = self._call_matcher(_Call((args, kwargs), two=True))
2316 cause = expected if isinstance(expected, Exception) else None
2317 actual = [self._call_matcher(c) for c in self.await_args_list]
2318 if cause or expected not in _AnyComparer(actual):
2319 expected_string = self._format_mock_call_signature(args, kwargs)
2320 raise AssertionError(
2321 '%s await not found' % expected_string
2322 ) from cause
2323
2324 def assert_has_awaits(self, calls, any_order=False):
2325 """
2326 Assert the mock has been awaited with the specified calls.
2327 The :attr:`await_args_list` list is checked for the awaits.
2328
2329 If `any_order` is False (the default) then the awaits must be
2330 sequential. There can be extra calls before or after the
2331 specified awaits.
2332
2333 If `any_order` is True then the awaits can be in any order, but
2334 they must all appear in :attr:`await_args_list`.
2335 """
2336 expected = [self._call_matcher(c) for c in calls]
2337 cause = next((e for e in expected if isinstance(e, Exception)), None)
2338 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2339 if not any_order:
2340 if expected not in all_awaits:
2341 if cause is None:
2342 problem = 'Awaits not found.'
2343 else:
2344 problem = ('Error processing expected awaits.\n'
2345 'Errors: {}').format(
2346 [e if isinstance(e, Exception) else None
2347 for e in expected])
2348 raise AssertionError(
2349 f'{problem}\n'
2350 f'Expected: {_CallList(calls)}\n'
2351 f'Actual: {self.await_args_list}'
2352 ) from cause
2353 return
2354
2355 all_awaits = list(all_awaits)
2356
2357 not_found = []
2358 for kall in expected:
2359 try:
2360 all_awaits.remove(kall)
2361 except ValueError:
2362 not_found.append(kall)
2363 if not_found:
2364 raise AssertionError(
2365 '%r not all found in await list' % (tuple(not_found),)
2366 ) from cause
2367
2368 def assert_not_awaited(self):
2369 """
2370 Assert that the mock was never awaited.
2371 """
2372 if self.await_count != 0:
2373 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2374 f" Awaited {self.await_count} times.")
2375 raise AssertionError(msg)
2376
2377 def reset_mock(self, /, *args, **kwargs):
2378 """
2379 See :func:`.Mock.reset_mock()`
2380 """
2381 super().reset_mock(*args, **kwargs)
2382 self.await_count = 0
2383 self.await_args = None
2384 self.await_args_list = _CallList()
2385
2386
2387 class ESC[4;38;5;81mAsyncMock(ESC[4;38;5;149mAsyncMockMixin, ESC[4;38;5;149mAsyncMagicMixin, ESC[4;38;5;149mMock):
2388 """
2389 Enhance :class:`Mock` with features allowing to mock
2390 an async function.
2391
2392 The :class:`AsyncMock` object will behave so the object is
2393 recognized as an async function, and the result of a call is an awaitable:
2394
2395 >>> mock = AsyncMock()
2396 >>> iscoroutinefunction(mock)
2397 True
2398 >>> inspect.isawaitable(mock())
2399 True
2400
2401
2402 The result of ``mock()`` is an async function which will have the outcome
2403 of ``side_effect`` or ``return_value``:
2404
2405 - if ``side_effect`` is a function, the async function will return the
2406 result of that function,
2407 - if ``side_effect`` is an exception, the async function will raise the
2408 exception,
2409 - if ``side_effect`` is an iterable, the async function will return the
2410 next value of the iterable, however, if the sequence of result is
2411 exhausted, ``StopIteration`` is raised immediately,
2412 - if ``side_effect`` is not defined, the async function will return the
2413 value defined by ``return_value``, hence, by default, the async function
2414 returns a new :class:`AsyncMock` object.
2415
2416 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2417 the mock async function obtained when the mock object is called will be this
2418 async function itself (and not an async function returning an async
2419 function).
2420
2421 The test author can also specify a wrapped object with ``wraps``. In this
2422 case, the :class:`Mock` object behavior is the same as with an
2423 :class:`.Mock` object: the wrapped object may have methods
2424 defined as async function functions.
2425
2426 Based on Martin Richard's asynctest project.
2427 """
2428
2429
2430 class ESC[4;38;5;81m_ANY(ESC[4;38;5;149mobject):
2431 "A helper object that compares equal to everything."
2432
2433 def __eq__(self, other):
2434 return True
2435
2436 def __ne__(self, other):
2437 return False
2438
2439 def __repr__(self):
2440 return '<ANY>'
2441
2442 ANY = _ANY()
2443
2444
2445
2446 def _format_call_signature(name, args, kwargs):
2447 message = '%s(%%s)' % name
2448 formatted_args = ''
2449 args_string = ', '.join([repr(arg) for arg in args])
2450 kwargs_string = ', '.join([
2451 '%s=%r' % (key, value) for key, value in kwargs.items()
2452 ])
2453 if args_string:
2454 formatted_args = args_string
2455 if kwargs_string:
2456 if formatted_args:
2457 formatted_args += ', '
2458 formatted_args += kwargs_string
2459
2460 return message % formatted_args
2461
2462
2463
2464 class ESC[4;38;5;81m_Call(ESC[4;38;5;149mtuple):
2465 """
2466 A tuple for holding the results of a call to a mock, either in the form
2467 `(args, kwargs)` or `(name, args, kwargs)`.
2468
2469 If args or kwargs are empty then a call tuple will compare equal to
2470 a tuple without those values. This makes comparisons less verbose::
2471
2472 _Call(('name', (), {})) == ('name',)
2473 _Call(('name', (1,), {})) == ('name', (1,))
2474 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2475
2476 The `_Call` object provides a useful shortcut for comparing with call::
2477
2478 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2479 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2480
2481 If the _Call has no name then it will match any name.
2482 """
2483 def __new__(cls, value=(), name='', parent=None, two=False,
2484 from_kall=True):
2485 args = ()
2486 kwargs = {}
2487 _len = len(value)
2488 if _len == 3:
2489 name, args, kwargs = value
2490 elif _len == 2:
2491 first, second = value
2492 if isinstance(first, str):
2493 name = first
2494 if isinstance(second, tuple):
2495 args = second
2496 else:
2497 kwargs = second
2498 else:
2499 args, kwargs = first, second
2500 elif _len == 1:
2501 value, = value
2502 if isinstance(value, str):
2503 name = value
2504 elif isinstance(value, tuple):
2505 args = value
2506 else:
2507 kwargs = value
2508
2509 if two:
2510 return tuple.__new__(cls, (args, kwargs))
2511
2512 return tuple.__new__(cls, (name, args, kwargs))
2513
2514
2515 def __init__(self, value=(), name=None, parent=None, two=False,
2516 from_kall=True):
2517 self._mock_name = name
2518 self._mock_parent = parent
2519 self._mock_from_kall = from_kall
2520
2521
2522 def __eq__(self, other):
2523 try:
2524 len_other = len(other)
2525 except TypeError:
2526 return NotImplemented
2527
2528 self_name = ''
2529 if len(self) == 2:
2530 self_args, self_kwargs = self
2531 else:
2532 self_name, self_args, self_kwargs = self
2533
2534 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2535 and self._mock_parent != other._mock_parent):
2536 return False
2537
2538 other_name = ''
2539 if len_other == 0:
2540 other_args, other_kwargs = (), {}
2541 elif len_other == 3:
2542 other_name, other_args, other_kwargs = other
2543 elif len_other == 1:
2544 value, = other
2545 if isinstance(value, tuple):
2546 other_args = value
2547 other_kwargs = {}
2548 elif isinstance(value, str):
2549 other_name = value
2550 other_args, other_kwargs = (), {}
2551 else:
2552 other_args = ()
2553 other_kwargs = value
2554 elif len_other == 2:
2555 # could be (name, args) or (name, kwargs) or (args, kwargs)
2556 first, second = other
2557 if isinstance(first, str):
2558 other_name = first
2559 if isinstance(second, tuple):
2560 other_args, other_kwargs = second, {}
2561 else:
2562 other_args, other_kwargs = (), second
2563 else:
2564 other_args, other_kwargs = first, second
2565 else:
2566 return False
2567
2568 if self_name and other_name != self_name:
2569 return False
2570
2571 # this order is important for ANY to work!
2572 return (other_args, other_kwargs) == (self_args, self_kwargs)
2573
2574
2575 __ne__ = object.__ne__
2576
2577
2578 def __call__(self, /, *args, **kwargs):
2579 if self._mock_name is None:
2580 return _Call(('', args, kwargs), name='()')
2581
2582 name = self._mock_name + '()'
2583 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2584
2585
2586 def __getattr__(self, attr):
2587 if self._mock_name is None:
2588 return _Call(name=attr, from_kall=False)
2589 name = '%s.%s' % (self._mock_name, attr)
2590 return _Call(name=name, parent=self, from_kall=False)
2591
2592
2593 def __getattribute__(self, attr):
2594 if attr in tuple.__dict__:
2595 raise AttributeError
2596 return tuple.__getattribute__(self, attr)
2597
2598
2599 def _get_call_arguments(self):
2600 if len(self) == 2:
2601 args, kwargs = self
2602 else:
2603 name, args, kwargs = self
2604
2605 return args, kwargs
2606
2607 @property
2608 def args(self):
2609 return self._get_call_arguments()[0]
2610
2611 @property
2612 def kwargs(self):
2613 return self._get_call_arguments()[1]
2614
2615 def __repr__(self):
2616 if not self._mock_from_kall:
2617 name = self._mock_name or 'call'
2618 if name.startswith('()'):
2619 name = 'call%s' % name
2620 return name
2621
2622 if len(self) == 2:
2623 name = 'call'
2624 args, kwargs = self
2625 else:
2626 name, args, kwargs = self
2627 if not name:
2628 name = 'call'
2629 elif not name.startswith('()'):
2630 name = 'call.%s' % name
2631 else:
2632 name = 'call%s' % name
2633 return _format_call_signature(name, args, kwargs)
2634
2635
2636 def call_list(self):
2637 """For a call object that represents multiple calls, `call_list`
2638 returns a list of all the intermediate calls as well as the
2639 final call."""
2640 vals = []
2641 thing = self
2642 while thing is not None:
2643 if thing._mock_from_kall:
2644 vals.append(thing)
2645 thing = thing._mock_parent
2646 return _CallList(reversed(vals))
2647
2648
2649 call = _Call(from_kall=False)
2650
2651
2652 def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2653 _name=None, *, unsafe=False, **kwargs):
2654 """Create a mock object using another object as a spec. Attributes on the
2655 mock will use the corresponding attribute on the `spec` object as their
2656 spec.
2657
2658 Functions or methods being mocked will have their arguments checked
2659 to check that they are called with the correct signature.
2660
2661 If `spec_set` is True then attempting to set attributes that don't exist
2662 on the spec object will raise an `AttributeError`.
2663
2664 If a class is used as a spec then the return value of the mock (the
2665 instance of the class) will have the same spec. You can use a class as the
2666 spec for an instance object by passing `instance=True`. The returned mock
2667 will only be callable if instances of the mock are callable.
2668
2669 `create_autospec` will raise a `RuntimeError` if passed some common
2670 misspellings of the arguments autospec and spec_set. Pass the argument
2671 `unsafe` with the value True to disable that check.
2672
2673 `create_autospec` also takes arbitrary keyword arguments that are passed to
2674 the constructor of the created mock."""
2675 if _is_list(spec):
2676 # can't pass a list instance to the mock constructor as it will be
2677 # interpreted as a list of strings
2678 spec = type(spec)
2679
2680 is_type = isinstance(spec, type)
2681 if _is_instance_mock(spec):
2682 raise InvalidSpecError(f'Cannot autospec a Mock object. '
2683 f'[object={spec!r}]')
2684 is_async_func = _is_async_func(spec)
2685 _kwargs = {'spec': spec}
2686 if spec_set:
2687 _kwargs = {'spec_set': spec}
2688 elif spec is None:
2689 # None we mock with a normal mock without a spec
2690 _kwargs = {}
2691 if _kwargs and instance:
2692 _kwargs['_spec_as_instance'] = True
2693 if not unsafe:
2694 _check_spec_arg_typos(kwargs)
2695
2696 _kwargs.update(kwargs)
2697
2698 Klass = MagicMock
2699 if inspect.isdatadescriptor(spec):
2700 # descriptors don't have a spec
2701 # because we don't know what type they return
2702 _kwargs = {}
2703 elif is_async_func:
2704 if instance:
2705 raise RuntimeError("Instance can not be True when create_autospec "
2706 "is mocking an async function")
2707 Klass = AsyncMock
2708 elif not _callable(spec):
2709 Klass = NonCallableMagicMock
2710 elif is_type and instance and not _instance_callable(spec):
2711 Klass = NonCallableMagicMock
2712
2713 _name = _kwargs.pop('name', _name)
2714
2715 _new_name = _name
2716 if _parent is None:
2717 # for a top level object no _new_name should be set
2718 _new_name = ''
2719
2720 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2721 name=_name, **_kwargs)
2722
2723 if isinstance(spec, FunctionTypes):
2724 # should only happen at the top level because we don't
2725 # recurse for functions
2726 mock = _set_signature(mock, spec)
2727 if is_async_func:
2728 _setup_async_mock(mock)
2729 else:
2730 _check_signature(spec, mock, is_type, instance)
2731
2732 if _parent is not None and not instance:
2733 _parent._mock_children[_name] = mock
2734
2735 if is_type and not instance and 'return_value' not in kwargs:
2736 mock.return_value = create_autospec(spec, spec_set, instance=True,
2737 _name='()', _parent=mock)
2738
2739 for entry in dir(spec):
2740 if _is_magic(entry):
2741 # MagicMock already does the useful magic methods for us
2742 continue
2743
2744 # XXXX do we need a better way of getting attributes without
2745 # triggering code execution (?) Probably not - we need the actual
2746 # object to mock it so we would rather trigger a property than mock
2747 # the property descriptor. Likewise we want to mock out dynamically
2748 # provided attributes.
2749 # XXXX what about attributes that raise exceptions other than
2750 # AttributeError on being fetched?
2751 # we could be resilient against it, or catch and propagate the
2752 # exception when the attribute is fetched from the mock
2753 try:
2754 original = getattr(spec, entry)
2755 except AttributeError:
2756 continue
2757
2758 kwargs = {'spec': original}
2759 if spec_set:
2760 kwargs = {'spec_set': original}
2761
2762 if not isinstance(original, FunctionTypes):
2763 new = _SpecState(original, spec_set, mock, entry, instance)
2764 mock._mock_children[entry] = new
2765 else:
2766 parent = mock
2767 if isinstance(spec, FunctionTypes):
2768 parent = mock.mock
2769
2770 skipfirst = _must_skip(spec, entry, is_type)
2771 kwargs['_eat_self'] = skipfirst
2772 if iscoroutinefunction(original):
2773 child_klass = AsyncMock
2774 else:
2775 child_klass = MagicMock
2776 new = child_klass(parent=parent, name=entry, _new_name=entry,
2777 _new_parent=parent,
2778 **kwargs)
2779 mock._mock_children[entry] = new
2780 _check_signature(original, new, skipfirst=skipfirst)
2781
2782 # so functions created with _set_signature become instance attributes,
2783 # *plus* their underlying mock exists in _mock_children of the parent
2784 # mock. Adding to _mock_children may be unnecessary where we are also
2785 # setting as an instance attribute?
2786 if isinstance(new, FunctionTypes):
2787 setattr(mock, entry, new)
2788
2789 return mock
2790
2791
2792 def _must_skip(spec, entry, is_type):
2793 """
2794 Return whether we should skip the first argument on spec's `entry`
2795 attribute.
2796 """
2797 if not isinstance(spec, type):
2798 if entry in getattr(spec, '__dict__', {}):
2799 # instance attribute - shouldn't skip
2800 return False
2801 spec = spec.__class__
2802
2803 for klass in spec.__mro__:
2804 result = klass.__dict__.get(entry, DEFAULT)
2805 if result is DEFAULT:
2806 continue
2807 if isinstance(result, (staticmethod, classmethod)):
2808 return False
2809 elif isinstance(result, FunctionTypes):
2810 # Normal method => skip if looked up on type
2811 # (if looked up on instance, self is already skipped)
2812 return is_type
2813 else:
2814 return False
2815
2816 # function is a dynamically provided attribute
2817 return is_type
2818
2819
2820 class ESC[4;38;5;81m_SpecState(ESC[4;38;5;149mobject):
2821
2822 def __init__(self, spec, spec_set=False, parent=None,
2823 name=None, ids=None, instance=False):
2824 self.spec = spec
2825 self.ids = ids
2826 self.spec_set = spec_set
2827 self.parent = parent
2828 self.instance = instance
2829 self.name = name
2830
2831
2832 FunctionTypes = (
2833 # python function
2834 type(create_autospec),
2835 # instance method
2836 type(ANY.__eq__),
2837 )
2838
2839
2840 file_spec = None
2841 open_spec = None
2842
2843
2844 def _to_stream(read_data):
2845 if isinstance(read_data, bytes):
2846 return io.BytesIO(read_data)
2847 else:
2848 return io.StringIO(read_data)
2849
2850
2851 def mock_open(mock=None, read_data=''):
2852 """
2853 A helper function to create a mock to replace the use of `open`. It works
2854 for `open` called directly or used as a context manager.
2855
2856 The `mock` argument is the mock object to configure. If `None` (the
2857 default) then a `MagicMock` will be created for you, with the API limited
2858 to methods or attributes available on standard file handles.
2859
2860 `read_data` is a string for the `read`, `readline` and `readlines` of the
2861 file handle to return. This is an empty string by default.
2862 """
2863 _read_data = _to_stream(read_data)
2864 _state = [_read_data, None]
2865
2866 def _readlines_side_effect(*args, **kwargs):
2867 if handle.readlines.return_value is not None:
2868 return handle.readlines.return_value
2869 return _state[0].readlines(*args, **kwargs)
2870
2871 def _read_side_effect(*args, **kwargs):
2872 if handle.read.return_value is not None:
2873 return handle.read.return_value
2874 return _state[0].read(*args, **kwargs)
2875
2876 def _readline_side_effect(*args, **kwargs):
2877 yield from _iter_side_effect()
2878 while True:
2879 yield _state[0].readline(*args, **kwargs)
2880
2881 def _iter_side_effect():
2882 if handle.readline.return_value is not None:
2883 while True:
2884 yield handle.readline.return_value
2885 for line in _state[0]:
2886 yield line
2887
2888 def _next_side_effect():
2889 if handle.readline.return_value is not None:
2890 return handle.readline.return_value
2891 return next(_state[0])
2892
2893 global file_spec
2894 if file_spec is None:
2895 import _io
2896 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2897
2898 global open_spec
2899 if open_spec is None:
2900 import _io
2901 open_spec = list(set(dir(_io.open)))
2902 if mock is None:
2903 mock = MagicMock(name='open', spec=open_spec)
2904
2905 handle = MagicMock(spec=file_spec)
2906 handle.__enter__.return_value = handle
2907
2908 handle.write.return_value = None
2909 handle.read.return_value = None
2910 handle.readline.return_value = None
2911 handle.readlines.return_value = None
2912
2913 handle.read.side_effect = _read_side_effect
2914 _state[1] = _readline_side_effect()
2915 handle.readline.side_effect = _state[1]
2916 handle.readlines.side_effect = _readlines_side_effect
2917 handle.__iter__.side_effect = _iter_side_effect
2918 handle.__next__.side_effect = _next_side_effect
2919
2920 def reset_data(*args, **kwargs):
2921 _state[0] = _to_stream(read_data)
2922 if handle.readline.side_effect == _state[1]:
2923 # Only reset the side effect if the user hasn't overridden it.
2924 _state[1] = _readline_side_effect()
2925 handle.readline.side_effect = _state[1]
2926 return DEFAULT
2927
2928 mock.side_effect = reset_data
2929 mock.return_value = handle
2930 return mock
2931
2932
2933 class ESC[4;38;5;81mPropertyMock(ESC[4;38;5;149mMock):
2934 """
2935 A mock intended to be used as a property, or other descriptor, on a class.
2936 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
2937 a return value when it is fetched.
2938
2939 Fetching a `PropertyMock` instance from an object calls the mock, with
2940 no args. Setting it calls the mock with the value being set.
2941 """
2942 def _get_child_mock(self, /, **kwargs):
2943 return MagicMock(**kwargs)
2944
2945 def __get__(self, obj, obj_type=None):
2946 return self()
2947 def __set__(self, obj, val):
2948 self(val)
2949
2950
2951 def seal(mock):
2952 """Disable the automatic generation of child mocks.
2953
2954 Given an input Mock, seals it to ensure no further mocks will be generated
2955 when accessing an attribute that was not already defined.
2956
2957 The operation recursively seals the mock passed in, meaning that
2958 the mock itself, any mocks generated by accessing one of its attributes,
2959 and all assigned mocks without a name or spec will be sealed.
2960 """
2961 mock._mock_sealed = True
2962 for attr in dir(mock):
2963 try:
2964 m = getattr(mock, attr)
2965 except AttributeError:
2966 continue
2967 if not isinstance(m, NonCallableMock):
2968 continue
2969 if isinstance(m._mock_children.get(attr), _SpecState):
2970 continue
2971 if m._mock_new_parent is mock:
2972 seal(m)
2973
2974
2975 class ESC[4;38;5;81m_AsyncIterator:
2976 """
2977 Wraps an iterator in an asynchronous iterator.
2978 """
2979 def __init__(self, iterator):
2980 self.iterator = iterator
2981 code_mock = NonCallableMock(spec_set=CodeType)
2982 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
2983 self.__dict__['__code__'] = code_mock
2984
2985 async def __anext__(self):
2986 try:
2987 return next(self.iterator)
2988 except StopIteration:
2989 pass
2990 raise StopAsyncIteration