python (3.12.0)
1 # mock.py
2 # Test tools for mocking and patching.
3 # Maintained by Michael Foord
4 # Backport for other versions of Python available from
5 # https://pypi.org/project/mock
6
7 __all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'FILTER_DIR',
18 'NonCallableMock',
19 'NonCallableMagicMock',
20 'mock_open',
21 'PropertyMock',
22 'seal',
23 )
24
25
26 import asyncio
27 import contextlib
28 import io
29 import inspect
30 import pprint
31 import sys
32 import builtins
33 import pkgutil
34 from asyncio import iscoroutinefunction
35 from types import CodeType, ModuleType, MethodType
36 from unittest.util import safe_repr
37 from functools import wraps, partial
38 from threading import RLock
39
40
41 class ESC[4;38;5;81mInvalidSpecError(ESC[4;38;5;149mException):
42 """Indicates that an invalid value was used as a mock spec."""
43
44
45 _builtins = {name for name in dir(builtins) if not name.startswith('_')}
46
47 FILTER_DIR = True
48
49 # Workaround for issue #12370
50 # Without this, the __class__ properties wouldn't be set correctly
51 _safe_super = super
52
53 def _is_async_obj(obj):
54 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
55 return False
56 if hasattr(obj, '__func__'):
57 obj = getattr(obj, '__func__')
58 return iscoroutinefunction(obj) or inspect.isawaitable(obj)
59
60
61 def _is_async_func(func):
62 if getattr(func, '__code__', None):
63 return iscoroutinefunction(func)
64 else:
65 return False
66
67
68 def _is_instance_mock(obj):
69 # can't use isinstance on Mock objects because they override __class__
70 # The base class for all mocks is NonCallableMock
71 return issubclass(type(obj), NonCallableMock)
72
73
74 def _is_exception(obj):
75 return (
76 isinstance(obj, BaseException) or
77 isinstance(obj, type) and issubclass(obj, BaseException)
78 )
79
80
81 def _extract_mock(obj):
82 # Autospecced functions will return a FunctionType with "mock" attribute
83 # which is the actual mock object that needs to be used.
84 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
85 return obj.mock
86 else:
87 return obj
88
89
90 def _get_signature_object(func, as_instance, eat_self):
91 """
92 Given an arbitrary, possibly callable object, try to create a suitable
93 signature object.
94 Return a (reduced func, signature) tuple, or None.
95 """
96 if isinstance(func, type) and not as_instance:
97 # If it's a type and should be modelled as a type, use __init__.
98 func = func.__init__
99 # Skip the `self` argument in __init__
100 eat_self = True
101 elif isinstance(func, (classmethod, staticmethod)):
102 if isinstance(func, classmethod):
103 # Skip the `cls` argument of a class method
104 eat_self = True
105 # Use the original decorated method to extract the correct function signature
106 func = func.__func__
107 elif not isinstance(func, FunctionTypes):
108 # If we really want to model an instance of the passed type,
109 # __call__ should be looked up, not __init__.
110 try:
111 func = func.__call__
112 except AttributeError:
113 return None
114 if eat_self:
115 sig_func = partial(func, None)
116 else:
117 sig_func = func
118 try:
119 return func, inspect.signature(sig_func)
120 except ValueError:
121 # Certain callable types are not supported by inspect.signature()
122 return None
123
124
125 def _check_signature(func, mock, skipfirst, instance=False):
126 sig = _get_signature_object(func, instance, skipfirst)
127 if sig is None:
128 return
129 func, sig = sig
130 def checksig(self, /, *args, **kwargs):
131 sig.bind(*args, **kwargs)
132 _copy_func_details(func, checksig)
133 type(mock)._mock_check_sig = checksig
134 type(mock).__signature__ = sig
135
136
137 def _copy_func_details(func, funcopy):
138 # we explicitly don't copy func.__dict__ into this copy as it would
139 # expose original attributes that should be mocked
140 for attribute in (
141 '__name__', '__doc__', '__text_signature__',
142 '__module__', '__defaults__', '__kwdefaults__',
143 ):
144 try:
145 setattr(funcopy, attribute, getattr(func, attribute))
146 except AttributeError:
147 pass
148
149
150 def _callable(obj):
151 if isinstance(obj, type):
152 return True
153 if isinstance(obj, (staticmethod, classmethod, MethodType)):
154 return _callable(obj.__func__)
155 if getattr(obj, '__call__', None) is not None:
156 return True
157 return False
158
159
160 def _is_list(obj):
161 # checks for list or tuples
162 # XXXX badly named!
163 return type(obj) in (list, tuple)
164
165
166 def _instance_callable(obj):
167 """Given an object, return True if the object is callable.
168 For classes, return True if instances would be callable."""
169 if not isinstance(obj, type):
170 # already an instance
171 return getattr(obj, '__call__', None) is not None
172
173 # *could* be broken by a class overriding __mro__ or __dict__ via
174 # a metaclass
175 for base in (obj,) + obj.__mro__:
176 if base.__dict__.get('__call__') is not None:
177 return True
178 return False
179
180
181 def _set_signature(mock, original, instance=False):
182 # creates a function with signature (*args, **kwargs) that delegates to a
183 # mock. It still does signature checking by calling a lambda with the same
184 # signature as the original.
185
186 skipfirst = isinstance(original, type)
187 result = _get_signature_object(original, instance, skipfirst)
188 if result is None:
189 return mock
190 func, sig = result
191 def checksig(*args, **kwargs):
192 sig.bind(*args, **kwargs)
193 _copy_func_details(func, checksig)
194
195 name = original.__name__
196 if not name.isidentifier():
197 name = 'funcopy'
198 context = {'_checksig_': checksig, 'mock': mock}
199 src = """def %s(*args, **kwargs):
200 _checksig_(*args, **kwargs)
201 return mock(*args, **kwargs)""" % name
202 exec (src, context)
203 funcopy = context[name]
204 _setup_func(funcopy, mock, sig)
205 return funcopy
206
207
208 def _setup_func(funcopy, mock, sig):
209 funcopy.mock = mock
210
211 def assert_called_with(*args, **kwargs):
212 return mock.assert_called_with(*args, **kwargs)
213 def assert_called(*args, **kwargs):
214 return mock.assert_called(*args, **kwargs)
215 def assert_not_called(*args, **kwargs):
216 return mock.assert_not_called(*args, **kwargs)
217 def assert_called_once(*args, **kwargs):
218 return mock.assert_called_once(*args, **kwargs)
219 def assert_called_once_with(*args, **kwargs):
220 return mock.assert_called_once_with(*args, **kwargs)
221 def assert_has_calls(*args, **kwargs):
222 return mock.assert_has_calls(*args, **kwargs)
223 def assert_any_call(*args, **kwargs):
224 return mock.assert_any_call(*args, **kwargs)
225 def reset_mock():
226 funcopy.method_calls = _CallList()
227 funcopy.mock_calls = _CallList()
228 mock.reset_mock()
229 ret = funcopy.return_value
230 if _is_instance_mock(ret) and not ret is mock:
231 ret.reset_mock()
232
233 funcopy.called = False
234 funcopy.call_count = 0
235 funcopy.call_args = None
236 funcopy.call_args_list = _CallList()
237 funcopy.method_calls = _CallList()
238 funcopy.mock_calls = _CallList()
239
240 funcopy.return_value = mock.return_value
241 funcopy.side_effect = mock.side_effect
242 funcopy._mock_children = mock._mock_children
243
244 funcopy.assert_called_with = assert_called_with
245 funcopy.assert_called_once_with = assert_called_once_with
246 funcopy.assert_has_calls = assert_has_calls
247 funcopy.assert_any_call = assert_any_call
248 funcopy.reset_mock = reset_mock
249 funcopy.assert_called = assert_called
250 funcopy.assert_not_called = assert_not_called
251 funcopy.assert_called_once = assert_called_once
252 funcopy.__signature__ = sig
253
254 mock._mock_delegate = funcopy
255
256
257 def _setup_async_mock(mock):
258 mock._is_coroutine = asyncio.coroutines._is_coroutine
259 mock.await_count = 0
260 mock.await_args = None
261 mock.await_args_list = _CallList()
262
263 # Mock is not configured yet so the attributes are set
264 # to a function and then the corresponding mock helper function
265 # is called when the helper is accessed similar to _setup_func.
266 def wrapper(attr, /, *args, **kwargs):
267 return getattr(mock.mock, attr)(*args, **kwargs)
268
269 for attribute in ('assert_awaited',
270 'assert_awaited_once',
271 'assert_awaited_with',
272 'assert_awaited_once_with',
273 'assert_any_await',
274 'assert_has_awaits',
275 'assert_not_awaited'):
276
277 # setattr(mock, attribute, wrapper) causes late binding
278 # hence attribute will always be the last value in the loop
279 # Use partial(wrapper, attribute) to ensure the attribute is bound
280 # correctly.
281 setattr(mock, attribute, partial(wrapper, attribute))
282
283
284 def _is_magic(name):
285 return '__%s__' % name[2:-2] == name
286
287
288 class ESC[4;38;5;81m_SentinelObject(ESC[4;38;5;149mobject):
289 "A unique, named, sentinel object."
290 def __init__(self, name):
291 self.name = name
292
293 def __repr__(self):
294 return 'sentinel.%s' % self.name
295
296 def __reduce__(self):
297 return 'sentinel.%s' % self.name
298
299
300 class ESC[4;38;5;81m_Sentinel(ESC[4;38;5;149mobject):
301 """Access attributes to return a named object, usable as a sentinel."""
302 def __init__(self):
303 self._sentinels = {}
304
305 def __getattr__(self, name):
306 if name == '__bases__':
307 # Without this help(unittest.mock) raises an exception
308 raise AttributeError
309 return self._sentinels.setdefault(name, _SentinelObject(name))
310
311 def __reduce__(self):
312 return 'sentinel'
313
314
315 sentinel = _Sentinel()
316
317 DEFAULT = sentinel.DEFAULT
318 _missing = sentinel.MISSING
319 _deleted = sentinel.DELETED
320
321
322 _allowed_names = {
323 'return_value', '_mock_return_value', 'side_effect',
324 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
325 '_mock_name', '_mock_new_name'
326 }
327
328
329 def _delegating_property(name):
330 _allowed_names.add(name)
331 _the_name = '_mock_' + name
332 def _get(self, name=name, _the_name=_the_name):
333 sig = self._mock_delegate
334 if sig is None:
335 return getattr(self, _the_name)
336 return getattr(sig, name)
337 def _set(self, value, name=name, _the_name=_the_name):
338 sig = self._mock_delegate
339 if sig is None:
340 self.__dict__[_the_name] = value
341 else:
342 setattr(sig, name, value)
343
344 return property(_get, _set)
345
346
347
348 class ESC[4;38;5;81m_CallList(ESC[4;38;5;149mlist):
349
350 def __contains__(self, value):
351 if not isinstance(value, list):
352 return list.__contains__(self, value)
353 len_value = len(value)
354 len_self = len(self)
355 if len_value > len_self:
356 return False
357
358 for i in range(0, len_self - len_value + 1):
359 sub_list = self[i:i+len_value]
360 if sub_list == value:
361 return True
362 return False
363
364 def __repr__(self):
365 return pprint.pformat(list(self))
366
367
368 def _check_and_set_parent(parent, value, name, new_name):
369 value = _extract_mock(value)
370
371 if not _is_instance_mock(value):
372 return False
373 if ((value._mock_name or value._mock_new_name) or
374 (value._mock_parent is not None) or
375 (value._mock_new_parent is not None)):
376 return False
377
378 _parent = parent
379 while _parent is not None:
380 # setting a mock (value) as a child or return value of itself
381 # should not modify the mock
382 if _parent is value:
383 return False
384 _parent = _parent._mock_new_parent
385
386 if new_name:
387 value._mock_new_parent = parent
388 value._mock_new_name = new_name
389 if name:
390 value._mock_parent = parent
391 value._mock_name = name
392 return True
393
394 # Internal class to identify if we wrapped an iterator object or not.
395 class ESC[4;38;5;81m_MockIter(ESC[4;38;5;149mobject):
396 def __init__(self, obj):
397 self.obj = iter(obj)
398 def __next__(self):
399 return next(self.obj)
400
401 class ESC[4;38;5;81mBase(ESC[4;38;5;149mobject):
402 _mock_return_value = DEFAULT
403 _mock_side_effect = None
404 def __init__(self, /, *args, **kwargs):
405 pass
406
407
408
409 class ESC[4;38;5;81mNonCallableMock(ESC[4;38;5;149mBase):
410 """A non-callable version of `Mock`"""
411
412 # Store a mutex as a class attribute in order to protect concurrent access
413 # to mock attributes. Using a class attribute allows all NonCallableMock
414 # instances to share the mutex for simplicity.
415 #
416 # See https://github.com/python/cpython/issues/98624 for why this is
417 # necessary.
418 _lock = RLock()
419
420 def __new__(
421 cls, spec=None, wraps=None, name=None, spec_set=None,
422 parent=None, _spec_state=None, _new_name='', _new_parent=None,
423 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
424 ):
425 # every instance has its own class
426 # so we can create magic methods on the
427 # class without stomping on other mocks
428 bases = (cls,)
429 if not issubclass(cls, AsyncMockMixin):
430 # Check if spec is an async object or function
431 spec_arg = spec_set or spec
432 if spec_arg is not None and _is_async_obj(spec_arg):
433 bases = (AsyncMockMixin, cls)
434 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
435 instance = _safe_super(NonCallableMock, cls).__new__(new)
436 return instance
437
438
439 def __init__(
440 self, spec=None, wraps=None, name=None, spec_set=None,
441 parent=None, _spec_state=None, _new_name='', _new_parent=None,
442 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
443 ):
444 if _new_parent is None:
445 _new_parent = parent
446
447 __dict__ = self.__dict__
448 __dict__['_mock_parent'] = parent
449 __dict__['_mock_name'] = name
450 __dict__['_mock_new_name'] = _new_name
451 __dict__['_mock_new_parent'] = _new_parent
452 __dict__['_mock_sealed'] = False
453
454 if spec_set is not None:
455 spec = spec_set
456 spec_set = True
457 if _eat_self is None:
458 _eat_self = parent is not None
459
460 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
461
462 __dict__['_mock_children'] = {}
463 __dict__['_mock_wraps'] = wraps
464 __dict__['_mock_delegate'] = None
465
466 __dict__['_mock_called'] = False
467 __dict__['_mock_call_args'] = None
468 __dict__['_mock_call_count'] = 0
469 __dict__['_mock_call_args_list'] = _CallList()
470 __dict__['_mock_mock_calls'] = _CallList()
471
472 __dict__['method_calls'] = _CallList()
473 __dict__['_mock_unsafe'] = unsafe
474
475 if kwargs:
476 self.configure_mock(**kwargs)
477
478 _safe_super(NonCallableMock, self).__init__(
479 spec, wraps, name, spec_set, parent,
480 _spec_state
481 )
482
483
484 def attach_mock(self, mock, attribute):
485 """
486 Attach a mock as an attribute of this one, replacing its name and
487 parent. Calls to the attached mock will be recorded in the
488 `method_calls` and `mock_calls` attributes of this one."""
489 inner_mock = _extract_mock(mock)
490
491 inner_mock._mock_parent = None
492 inner_mock._mock_new_parent = None
493 inner_mock._mock_name = ''
494 inner_mock._mock_new_name = None
495
496 setattr(self, attribute, mock)
497
498
499 def mock_add_spec(self, spec, spec_set=False):
500 """Add a spec to a mock. `spec` can either be an object or a
501 list of strings. Only attributes on the `spec` can be fetched as
502 attributes from the mock.
503
504 If `spec_set` is True then only attributes on the spec can be set."""
505 self._mock_add_spec(spec, spec_set)
506
507
508 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
509 _eat_self=False):
510 if _is_instance_mock(spec):
511 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]')
512
513 _spec_class = None
514 _spec_signature = None
515 _spec_asyncs = []
516
517 if spec is not None and not _is_list(spec):
518 if isinstance(spec, type):
519 _spec_class = spec
520 else:
521 _spec_class = type(spec)
522 res = _get_signature_object(spec,
523 _spec_as_instance, _eat_self)
524 _spec_signature = res and res[1]
525
526 spec_list = dir(spec)
527
528 for attr in spec_list:
529 if iscoroutinefunction(getattr(spec, attr, None)):
530 _spec_asyncs.append(attr)
531
532 spec = spec_list
533
534 __dict__ = self.__dict__
535 __dict__['_spec_class'] = _spec_class
536 __dict__['_spec_set'] = spec_set
537 __dict__['_spec_signature'] = _spec_signature
538 __dict__['_mock_methods'] = spec
539 __dict__['_spec_asyncs'] = _spec_asyncs
540
541 def __get_return_value(self):
542 ret = self._mock_return_value
543 if self._mock_delegate is not None:
544 ret = self._mock_delegate.return_value
545
546 if ret is DEFAULT:
547 ret = self._get_child_mock(
548 _new_parent=self, _new_name='()'
549 )
550 self.return_value = ret
551 return ret
552
553
554 def __set_return_value(self, value):
555 if self._mock_delegate is not None:
556 self._mock_delegate.return_value = value
557 else:
558 self._mock_return_value = value
559 _check_and_set_parent(self, value, None, '()')
560
561 __return_value_doc = "The value to be returned when the mock is called."
562 return_value = property(__get_return_value, __set_return_value,
563 __return_value_doc)
564
565
566 @property
567 def __class__(self):
568 if self._spec_class is None:
569 return type(self)
570 return self._spec_class
571
572 called = _delegating_property('called')
573 call_count = _delegating_property('call_count')
574 call_args = _delegating_property('call_args')
575 call_args_list = _delegating_property('call_args_list')
576 mock_calls = _delegating_property('mock_calls')
577
578
579 def __get_side_effect(self):
580 delegated = self._mock_delegate
581 if delegated is None:
582 return self._mock_side_effect
583 sf = delegated.side_effect
584 if (sf is not None and not callable(sf)
585 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
586 sf = _MockIter(sf)
587 delegated.side_effect = sf
588 return sf
589
590 def __set_side_effect(self, value):
591 value = _try_iter(value)
592 delegated = self._mock_delegate
593 if delegated is None:
594 self._mock_side_effect = value
595 else:
596 delegated.side_effect = value
597
598 side_effect = property(__get_side_effect, __set_side_effect)
599
600
601 def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
602 "Restore the mock object to its initial state."
603 if visited is None:
604 visited = []
605 if id(self) in visited:
606 return
607 visited.append(id(self))
608
609 self.called = False
610 self.call_args = None
611 self.call_count = 0
612 self.mock_calls = _CallList()
613 self.call_args_list = _CallList()
614 self.method_calls = _CallList()
615
616 if return_value:
617 self._mock_return_value = DEFAULT
618 if side_effect:
619 self._mock_side_effect = None
620
621 for child in self._mock_children.values():
622 if isinstance(child, _SpecState) or child is _deleted:
623 continue
624 child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
625
626 ret = self._mock_return_value
627 if _is_instance_mock(ret) and ret is not self:
628 ret.reset_mock(visited)
629
630
631 def configure_mock(self, /, **kwargs):
632 """Set attributes on the mock through keyword arguments.
633
634 Attributes plus return values and side effects can be set on child
635 mocks using standard dot notation and unpacking a dictionary in the
636 method call:
637
638 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
639 >>> mock.configure_mock(**attrs)"""
640 for arg, val in sorted(kwargs.items(),
641 # we sort on the number of dots so that
642 # attributes are set before we set attributes on
643 # attributes
644 key=lambda entry: entry[0].count('.')):
645 args = arg.split('.')
646 final = args.pop()
647 obj = self
648 for entry in args:
649 obj = getattr(obj, entry)
650 setattr(obj, final, val)
651
652
653 def __getattr__(self, name):
654 if name in {'_mock_methods', '_mock_unsafe'}:
655 raise AttributeError(name)
656 elif self._mock_methods is not None:
657 if name not in self._mock_methods or name in _all_magics:
658 raise AttributeError("Mock object has no attribute %r" % name)
659 elif _is_magic(name):
660 raise AttributeError(name)
661 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods):
662 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST:
663 raise AttributeError(
664 f"{name!r} is not a valid assertion. Use a spec "
665 f"for the mock if {name!r} is meant to be an attribute.")
666
667 with NonCallableMock._lock:
668 result = self._mock_children.get(name)
669 if result is _deleted:
670 raise AttributeError(name)
671 elif result is None:
672 wraps = None
673 if self._mock_wraps is not None:
674 # XXXX should we get the attribute without triggering code
675 # execution?
676 wraps = getattr(self._mock_wraps, name)
677
678 result = self._get_child_mock(
679 parent=self, name=name, wraps=wraps, _new_name=name,
680 _new_parent=self
681 )
682 self._mock_children[name] = result
683
684 elif isinstance(result, _SpecState):
685 try:
686 result = create_autospec(
687 result.spec, result.spec_set, result.instance,
688 result.parent, result.name
689 )
690 except InvalidSpecError:
691 target_name = self.__dict__['_mock_name'] or self
692 raise InvalidSpecError(
693 f'Cannot autospec attr {name!r} from target '
694 f'{target_name!r} as it has already been mocked out. '
695 f'[target={self!r}, attr={result.spec!r}]')
696 self._mock_children[name] = result
697
698 return result
699
700
701 def _extract_mock_name(self):
702 _name_list = [self._mock_new_name]
703 _parent = self._mock_new_parent
704 last = self
705
706 dot = '.'
707 if _name_list == ['()']:
708 dot = ''
709
710 while _parent is not None:
711 last = _parent
712
713 _name_list.append(_parent._mock_new_name + dot)
714 dot = '.'
715 if _parent._mock_new_name == '()':
716 dot = ''
717
718 _parent = _parent._mock_new_parent
719
720 _name_list = list(reversed(_name_list))
721 _first = last._mock_name or 'mock'
722 if len(_name_list) > 1:
723 if _name_list[1] not in ('()', '().'):
724 _first += '.'
725 _name_list[0] = _first
726 return ''.join(_name_list)
727
728 def __repr__(self):
729 name = self._extract_mock_name()
730
731 name_string = ''
732 if name not in ('mock', 'mock.'):
733 name_string = ' name=%r' % name
734
735 spec_string = ''
736 if self._spec_class is not None:
737 spec_string = ' spec=%r'
738 if self._spec_set:
739 spec_string = ' spec_set=%r'
740 spec_string = spec_string % self._spec_class.__name__
741 return "<%s%s%s id='%s'>" % (
742 type(self).__name__,
743 name_string,
744 spec_string,
745 id(self)
746 )
747
748
749 def __dir__(self):
750 """Filter the output of `dir(mock)` to only useful members."""
751 if not FILTER_DIR:
752 return object.__dir__(self)
753
754 extras = self._mock_methods or []
755 from_type = dir(type(self))
756 from_dict = list(self.__dict__)
757 from_child_mocks = [
758 m_name for m_name, m_value in self._mock_children.items()
759 if m_value is not _deleted]
760
761 from_type = [e for e in from_type if not e.startswith('_')]
762 from_dict = [e for e in from_dict if not e.startswith('_') or
763 _is_magic(e)]
764 return sorted(set(extras + from_type + from_dict + from_child_mocks))
765
766
767 def __setattr__(self, name, value):
768 if name in _allowed_names:
769 # property setters go through here
770 return object.__setattr__(self, name, value)
771 elif (self._spec_set and self._mock_methods is not None and
772 name not in self._mock_methods and
773 name not in self.__dict__):
774 raise AttributeError("Mock object has no attribute '%s'" % name)
775 elif name in _unsupported_magics:
776 msg = 'Attempting to set unsupported magic method %r.' % name
777 raise AttributeError(msg)
778 elif name in _all_magics:
779 if self._mock_methods is not None and name not in self._mock_methods:
780 raise AttributeError("Mock object has no attribute '%s'" % name)
781
782 if not _is_instance_mock(value):
783 setattr(type(self), name, _get_method(name, value))
784 original = value
785 value = lambda *args, **kw: original(self, *args, **kw)
786 else:
787 # only set _new_name and not name so that mock_calls is tracked
788 # but not method calls
789 _check_and_set_parent(self, value, None, name)
790 setattr(type(self), name, value)
791 self._mock_children[name] = value
792 elif name == '__class__':
793 self._spec_class = value
794 return
795 else:
796 if _check_and_set_parent(self, value, name, name):
797 self._mock_children[name] = value
798
799 if self._mock_sealed and not hasattr(self, name):
800 mock_name = f'{self._extract_mock_name()}.{name}'
801 raise AttributeError(f'Cannot set {mock_name}')
802
803 return object.__setattr__(self, name, value)
804
805
806 def __delattr__(self, name):
807 if name in _all_magics and name in type(self).__dict__:
808 delattr(type(self), name)
809 if name not in self.__dict__:
810 # for magic methods that are still MagicProxy objects and
811 # not set on the instance itself
812 return
813
814 obj = self._mock_children.get(name, _missing)
815 if name in self.__dict__:
816 _safe_super(NonCallableMock, self).__delattr__(name)
817 elif obj is _deleted:
818 raise AttributeError(name)
819 if obj is not _missing:
820 del self._mock_children[name]
821 self._mock_children[name] = _deleted
822
823
824 def _format_mock_call_signature(self, args, kwargs):
825 name = self._mock_name or 'mock'
826 return _format_call_signature(name, args, kwargs)
827
828
829 def _format_mock_failure_message(self, args, kwargs, action='call'):
830 message = 'expected %s not found.\nExpected: %s\nActual: %s'
831 expected_string = self._format_mock_call_signature(args, kwargs)
832 call_args = self.call_args
833 actual_string = self._format_mock_call_signature(*call_args)
834 return message % (action, expected_string, actual_string)
835
836
837 def _get_call_signature_from_name(self, name):
838 """
839 * If call objects are asserted against a method/function like obj.meth1
840 then there could be no name for the call object to lookup. Hence just
841 return the spec_signature of the method/function being asserted against.
842 * If the name is not empty then remove () and split by '.' to get
843 list of names to iterate through the children until a potential
844 match is found. A child mock is created only during attribute access
845 so if we get a _SpecState then no attributes of the spec were accessed
846 and can be safely exited.
847 """
848 if not name:
849 return self._spec_signature
850
851 sig = None
852 names = name.replace('()', '').split('.')
853 children = self._mock_children
854
855 for name in names:
856 child = children.get(name)
857 if child is None or isinstance(child, _SpecState):
858 break
859 else:
860 # If an autospecced object is attached using attach_mock the
861 # child would be a function with mock object as attribute from
862 # which signature has to be derived.
863 child = _extract_mock(child)
864 children = child._mock_children
865 sig = child._spec_signature
866
867 return sig
868
869
870 def _call_matcher(self, _call):
871 """
872 Given a call (or simply an (args, kwargs) tuple), return a
873 comparison key suitable for matching with other calls.
874 This is a best effort method which relies on the spec's signature,
875 if available, or falls back on the arguments themselves.
876 """
877
878 if isinstance(_call, tuple) and len(_call) > 2:
879 sig = self._get_call_signature_from_name(_call[0])
880 else:
881 sig = self._spec_signature
882
883 if sig is not None:
884 if len(_call) == 2:
885 name = ''
886 args, kwargs = _call
887 else:
888 name, args, kwargs = _call
889 try:
890 bound_call = sig.bind(*args, **kwargs)
891 return call(name, bound_call.args, bound_call.kwargs)
892 except TypeError as e:
893 return e.with_traceback(None)
894 else:
895 return _call
896
897 def assert_not_called(self):
898 """assert that the mock was never called.
899 """
900 if self.call_count != 0:
901 msg = ("Expected '%s' to not have been called. Called %s times.%s"
902 % (self._mock_name or 'mock',
903 self.call_count,
904 self._calls_repr()))
905 raise AssertionError(msg)
906
907 def assert_called(self):
908 """assert that the mock was called at least once
909 """
910 if self.call_count == 0:
911 msg = ("Expected '%s' to have been called." %
912 (self._mock_name or 'mock'))
913 raise AssertionError(msg)
914
915 def assert_called_once(self):
916 """assert that the mock was called only once.
917 """
918 if not self.call_count == 1:
919 msg = ("Expected '%s' to have been called once. Called %s times.%s"
920 % (self._mock_name or 'mock',
921 self.call_count,
922 self._calls_repr()))
923 raise AssertionError(msg)
924
925 def assert_called_with(self, /, *args, **kwargs):
926 """assert that the last call was made with the specified arguments.
927
928 Raises an AssertionError if the args and keyword args passed in are
929 different to the last call to the mock."""
930 if self.call_args is None:
931 expected = self._format_mock_call_signature(args, kwargs)
932 actual = 'not called.'
933 error_message = ('expected call not found.\nExpected: %s\nActual: %s'
934 % (expected, actual))
935 raise AssertionError(error_message)
936
937 def _error_message():
938 msg = self._format_mock_failure_message(args, kwargs)
939 return msg
940 expected = self._call_matcher(_Call((args, kwargs), two=True))
941 actual = self._call_matcher(self.call_args)
942 if actual != expected:
943 cause = expected if isinstance(expected, Exception) else None
944 raise AssertionError(_error_message()) from cause
945
946
947 def assert_called_once_with(self, /, *args, **kwargs):
948 """assert that the mock was called exactly once and that that call was
949 with the specified arguments."""
950 if not self.call_count == 1:
951 msg = ("Expected '%s' to be called once. Called %s times.%s"
952 % (self._mock_name or 'mock',
953 self.call_count,
954 self._calls_repr()))
955 raise AssertionError(msg)
956 return self.assert_called_with(*args, **kwargs)
957
958
959 def assert_has_calls(self, calls, any_order=False):
960 """assert the mock has been called with the specified calls.
961 The `mock_calls` list is checked for the calls.
962
963 If `any_order` is False (the default) then the calls must be
964 sequential. There can be extra calls before or after the
965 specified calls.
966
967 If `any_order` is True then the calls can be in any order, but
968 they must all appear in `mock_calls`."""
969 expected = [self._call_matcher(c) for c in calls]
970 cause = next((e for e in expected if isinstance(e, Exception)), None)
971 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
972 if not any_order:
973 if expected not in all_calls:
974 if cause is None:
975 problem = 'Calls not found.'
976 else:
977 problem = ('Error processing expected calls.\n'
978 'Errors: {}').format(
979 [e if isinstance(e, Exception) else None
980 for e in expected])
981 raise AssertionError(
982 f'{problem}\n'
983 f'Expected: {_CallList(calls)}'
984 f'{self._calls_repr(prefix="Actual").rstrip(".")}'
985 ) from cause
986 return
987
988 all_calls = list(all_calls)
989
990 not_found = []
991 for kall in expected:
992 try:
993 all_calls.remove(kall)
994 except ValueError:
995 not_found.append(kall)
996 if not_found:
997 raise AssertionError(
998 '%r does not contain all of %r in its call list, '
999 'found %r instead' % (self._mock_name or 'mock',
1000 tuple(not_found), all_calls)
1001 ) from cause
1002
1003
1004 def assert_any_call(self, /, *args, **kwargs):
1005 """assert the mock has been called with the specified arguments.
1006
1007 The assert passes if the mock has *ever* been called, unlike
1008 `assert_called_with` and `assert_called_once_with` that only pass if
1009 the call is the most recent one."""
1010 expected = self._call_matcher(_Call((args, kwargs), two=True))
1011 cause = expected if isinstance(expected, Exception) else None
1012 actual = [self._call_matcher(c) for c in self.call_args_list]
1013 if cause or expected not in _AnyComparer(actual):
1014 expected_string = self._format_mock_call_signature(args, kwargs)
1015 raise AssertionError(
1016 '%s call not found' % expected_string
1017 ) from cause
1018
1019
1020 def _get_child_mock(self, /, **kw):
1021 """Create the child mocks for attributes and return value.
1022 By default child mocks will be the same type as the parent.
1023 Subclasses of Mock may want to override this to customize the way
1024 child mocks are made.
1025
1026 For non-callable mocks the callable variant will be used (rather than
1027 any custom subclass)."""
1028 if self._mock_sealed:
1029 attribute = f".{kw['name']}" if "name" in kw else "()"
1030 mock_name = self._extract_mock_name() + attribute
1031 raise AttributeError(mock_name)
1032
1033 _new_name = kw.get("_new_name")
1034 if _new_name in self.__dict__['_spec_asyncs']:
1035 return AsyncMock(**kw)
1036
1037 _type = type(self)
1038 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1039 # Any asynchronous magic becomes an AsyncMock
1040 klass = AsyncMock
1041 elif issubclass(_type, AsyncMockMixin):
1042 if (_new_name in _all_sync_magics or
1043 self._mock_methods and _new_name in self._mock_methods):
1044 # Any synchronous method on AsyncMock becomes a MagicMock
1045 klass = MagicMock
1046 else:
1047 klass = AsyncMock
1048 elif not issubclass(_type, CallableMixin):
1049 if issubclass(_type, NonCallableMagicMock):
1050 klass = MagicMock
1051 elif issubclass(_type, NonCallableMock):
1052 klass = Mock
1053 else:
1054 klass = _type.__mro__[1]
1055 return klass(**kw)
1056
1057
1058 def _calls_repr(self, prefix="Calls"):
1059 """Renders self.mock_calls as a string.
1060
1061 Example: "\nCalls: [call(1), call(2)]."
1062
1063 If self.mock_calls is empty, an empty string is returned. The
1064 output will be truncated if very long.
1065 """
1066 if not self.mock_calls:
1067 return ""
1068 return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1069
1070
1071 # Denylist for forbidden attribute names in safe mode
1072 _ATTRIB_DENY_LIST = frozenset({
1073 name.removeprefix("assert_")
1074 for name in dir(NonCallableMock)
1075 if name.startswith("assert_")
1076 })
1077
1078
1079 class ESC[4;38;5;81m_AnyComparer(ESC[4;38;5;149mlist):
1080 """A list which checks if it contains a call which may have an
1081 argument of ANY, flipping the components of item and self from
1082 their traditional locations so that ANY is guaranteed to be on
1083 the left."""
1084 def __contains__(self, item):
1085 for _call in self:
1086 assert len(item) == len(_call)
1087 if all([
1088 expected == actual
1089 for expected, actual in zip(item, _call)
1090 ]):
1091 return True
1092 return False
1093
1094
1095 def _try_iter(obj):
1096 if obj is None:
1097 return obj
1098 if _is_exception(obj):
1099 return obj
1100 if _callable(obj):
1101 return obj
1102 try:
1103 return iter(obj)
1104 except TypeError:
1105 # XXXX backwards compatibility
1106 # but this will blow up on first call - so maybe we should fail early?
1107 return obj
1108
1109
1110 class ESC[4;38;5;81mCallableMixin(ESC[4;38;5;149mBase):
1111
1112 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1113 wraps=None, name=None, spec_set=None, parent=None,
1114 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1115 self.__dict__['_mock_return_value'] = return_value
1116 _safe_super(CallableMixin, self).__init__(
1117 spec, wraps, name, spec_set, parent,
1118 _spec_state, _new_name, _new_parent, **kwargs
1119 )
1120
1121 self.side_effect = side_effect
1122
1123
1124 def _mock_check_sig(self, /, *args, **kwargs):
1125 # stub method that can be replaced with one with a specific signature
1126 pass
1127
1128
1129 def __call__(self, /, *args, **kwargs):
1130 # can't use self in-case a function / method we are mocking uses self
1131 # in the signature
1132 self._mock_check_sig(*args, **kwargs)
1133 self._increment_mock_call(*args, **kwargs)
1134 return self._mock_call(*args, **kwargs)
1135
1136
1137 def _mock_call(self, /, *args, **kwargs):
1138 return self._execute_mock_call(*args, **kwargs)
1139
1140 def _increment_mock_call(self, /, *args, **kwargs):
1141 self.called = True
1142 self.call_count += 1
1143
1144 # handle call_args
1145 # needs to be set here so assertions on call arguments pass before
1146 # execution in the case of awaited calls
1147 _call = _Call((args, kwargs), two=True)
1148 self.call_args = _call
1149 self.call_args_list.append(_call)
1150
1151 # initial stuff for method_calls:
1152 do_method_calls = self._mock_parent is not None
1153 method_call_name = self._mock_name
1154
1155 # initial stuff for mock_calls:
1156 mock_call_name = self._mock_new_name
1157 is_a_call = mock_call_name == '()'
1158 self.mock_calls.append(_Call(('', args, kwargs)))
1159
1160 # follow up the chain of mocks:
1161 _new_parent = self._mock_new_parent
1162 while _new_parent is not None:
1163
1164 # handle method_calls:
1165 if do_method_calls:
1166 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1167 do_method_calls = _new_parent._mock_parent is not None
1168 if do_method_calls:
1169 method_call_name = _new_parent._mock_name + '.' + method_call_name
1170
1171 # handle mock_calls:
1172 this_mock_call = _Call((mock_call_name, args, kwargs))
1173 _new_parent.mock_calls.append(this_mock_call)
1174
1175 if _new_parent._mock_new_name:
1176 if is_a_call:
1177 dot = ''
1178 else:
1179 dot = '.'
1180 is_a_call = _new_parent._mock_new_name == '()'
1181 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1182
1183 # follow the parental chain:
1184 _new_parent = _new_parent._mock_new_parent
1185
1186 def _execute_mock_call(self, /, *args, **kwargs):
1187 # separate from _increment_mock_call so that awaited functions are
1188 # executed separately from their call, also AsyncMock overrides this method
1189
1190 effect = self.side_effect
1191 if effect is not None:
1192 if _is_exception(effect):
1193 raise effect
1194 elif not _callable(effect):
1195 result = next(effect)
1196 if _is_exception(result):
1197 raise result
1198 else:
1199 result = effect(*args, **kwargs)
1200
1201 if result is not DEFAULT:
1202 return result
1203
1204 if self._mock_return_value is not DEFAULT:
1205 return self.return_value
1206
1207 if self._mock_wraps is not None:
1208 return self._mock_wraps(*args, **kwargs)
1209
1210 return self.return_value
1211
1212
1213
1214 class ESC[4;38;5;81mMock(ESC[4;38;5;149mCallableMixin, ESC[4;38;5;149mNonCallableMock):
1215 """
1216 Create a new `Mock` object. `Mock` takes several optional arguments
1217 that specify the behaviour of the Mock object:
1218
1219 * `spec`: This can be either a list of strings or an existing object (a
1220 class or instance) that acts as the specification for the mock object. If
1221 you pass in an object then a list of strings is formed by calling dir on
1222 the object (excluding unsupported magic attributes and methods). Accessing
1223 any attribute not in this list will raise an `AttributeError`.
1224
1225 If `spec` is an object (rather than a list of strings) then
1226 `mock.__class__` returns the class of the spec object. This allows mocks
1227 to pass `isinstance` tests.
1228
1229 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1230 or get an attribute on the mock that isn't on the object passed as
1231 `spec_set` will raise an `AttributeError`.
1232
1233 * `side_effect`: A function to be called whenever the Mock is called. See
1234 the `side_effect` attribute. Useful for raising exceptions or
1235 dynamically changing return values. The function is called with the same
1236 arguments as the mock, and unless it returns `DEFAULT`, the return
1237 value of this function is used as the return value.
1238
1239 If `side_effect` is an iterable then each call to the mock will return
1240 the next value from the iterable. If any of the members of the iterable
1241 are exceptions they will be raised instead of returned.
1242
1243 * `return_value`: The value returned when the mock is called. By default
1244 this is a new Mock (created on first access). See the
1245 `return_value` attribute.
1246
1247 * `unsafe`: By default, accessing any attribute whose name starts with
1248 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError.
1249 Additionally, an AttributeError is raised when accessing
1250 attributes that match the name of an assertion method without the prefix
1251 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`.
1252 Passing `unsafe=True` will allow access to these attributes.
1253
1254 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1255 calling the Mock will pass the call through to the wrapped object
1256 (returning the real result). Attribute access on the mock will return a
1257 Mock object that wraps the corresponding attribute of the wrapped object
1258 (so attempting to access an attribute that doesn't exist will raise an
1259 `AttributeError`).
1260
1261 If the mock has an explicit `return_value` set then calls are not passed
1262 to the wrapped object and the `return_value` is returned instead.
1263
1264 * `name`: If the mock has a name then it will be used in the repr of the
1265 mock. This can be useful for debugging. The name is propagated to child
1266 mocks.
1267
1268 Mocks can also be called with arbitrary keyword arguments. These will be
1269 used to set attributes on the mock after it is created.
1270 """
1271
1272
1273 # _check_spec_arg_typos takes kwargs from commands like patch and checks that
1274 # they don't contain common misspellings of arguments related to autospeccing.
1275 def _check_spec_arg_typos(kwargs_to_check):
1276 typos = ("autospect", "auto_spec", "set_spec")
1277 for typo in typos:
1278 if typo in kwargs_to_check:
1279 raise RuntimeError(
1280 f"{typo!r} might be a typo; use unsafe=True if this is intended"
1281 )
1282
1283
1284 class ESC[4;38;5;81m_patch(ESC[4;38;5;149mobject):
1285
1286 attribute_name = None
1287 _active_patches = []
1288
1289 def __init__(
1290 self, getter, attribute, new, spec, create,
1291 spec_set, autospec, new_callable, kwargs, *, unsafe=False
1292 ):
1293 if new_callable is not None:
1294 if new is not DEFAULT:
1295 raise ValueError(
1296 "Cannot use 'new' and 'new_callable' together"
1297 )
1298 if autospec is not None:
1299 raise ValueError(
1300 "Cannot use 'autospec' and 'new_callable' together"
1301 )
1302 if not unsafe:
1303 _check_spec_arg_typos(kwargs)
1304 if _is_instance_mock(spec):
1305 raise InvalidSpecError(
1306 f'Cannot spec attr {attribute!r} as the spec '
1307 f'has already been mocked out. [spec={spec!r}]')
1308 if _is_instance_mock(spec_set):
1309 raise InvalidSpecError(
1310 f'Cannot spec attr {attribute!r} as the spec_set '
1311 f'target has already been mocked out. [spec_set={spec_set!r}]')
1312
1313 self.getter = getter
1314 self.attribute = attribute
1315 self.new = new
1316 self.new_callable = new_callable
1317 self.spec = spec
1318 self.create = create
1319 self.has_local = False
1320 self.spec_set = spec_set
1321 self.autospec = autospec
1322 self.kwargs = kwargs
1323 self.additional_patchers = []
1324
1325
1326 def copy(self):
1327 patcher = _patch(
1328 self.getter, self.attribute, self.new, self.spec,
1329 self.create, self.spec_set,
1330 self.autospec, self.new_callable, self.kwargs
1331 )
1332 patcher.attribute_name = self.attribute_name
1333 patcher.additional_patchers = [
1334 p.copy() for p in self.additional_patchers
1335 ]
1336 return patcher
1337
1338
1339 def __call__(self, func):
1340 if isinstance(func, type):
1341 return self.decorate_class(func)
1342 if inspect.iscoroutinefunction(func):
1343 return self.decorate_async_callable(func)
1344 return self.decorate_callable(func)
1345
1346
1347 def decorate_class(self, klass):
1348 for attr in dir(klass):
1349 if not attr.startswith(patch.TEST_PREFIX):
1350 continue
1351
1352 attr_value = getattr(klass, attr)
1353 if not hasattr(attr_value, "__call__"):
1354 continue
1355
1356 patcher = self.copy()
1357 setattr(klass, attr, patcher(attr_value))
1358 return klass
1359
1360
1361 @contextlib.contextmanager
1362 def decoration_helper(self, patched, args, keywargs):
1363 extra_args = []
1364 with contextlib.ExitStack() as exit_stack:
1365 for patching in patched.patchings:
1366 arg = exit_stack.enter_context(patching)
1367 if patching.attribute_name is not None:
1368 keywargs.update(arg)
1369 elif patching.new is DEFAULT:
1370 extra_args.append(arg)
1371
1372 args += tuple(extra_args)
1373 yield (args, keywargs)
1374
1375
1376 def decorate_callable(self, func):
1377 # NB. Keep the method in sync with decorate_async_callable()
1378 if hasattr(func, 'patchings'):
1379 func.patchings.append(self)
1380 return func
1381
1382 @wraps(func)
1383 def patched(*args, **keywargs):
1384 with self.decoration_helper(patched,
1385 args,
1386 keywargs) as (newargs, newkeywargs):
1387 return func(*newargs, **newkeywargs)
1388
1389 patched.patchings = [self]
1390 return patched
1391
1392
1393 def decorate_async_callable(self, func):
1394 # NB. Keep the method in sync with decorate_callable()
1395 if hasattr(func, 'patchings'):
1396 func.patchings.append(self)
1397 return func
1398
1399 @wraps(func)
1400 async def patched(*args, **keywargs):
1401 with self.decoration_helper(patched,
1402 args,
1403 keywargs) as (newargs, newkeywargs):
1404 return await func(*newargs, **newkeywargs)
1405
1406 patched.patchings = [self]
1407 return patched
1408
1409
1410 def get_original(self):
1411 target = self.getter()
1412 name = self.attribute
1413
1414 original = DEFAULT
1415 local = False
1416
1417 try:
1418 original = target.__dict__[name]
1419 except (AttributeError, KeyError):
1420 original = getattr(target, name, DEFAULT)
1421 else:
1422 local = True
1423
1424 if name in _builtins and isinstance(target, ModuleType):
1425 self.create = True
1426
1427 if not self.create and original is DEFAULT:
1428 raise AttributeError(
1429 "%s does not have the attribute %r" % (target, name)
1430 )
1431 return original, local
1432
1433
1434 def __enter__(self):
1435 """Perform the patch."""
1436 new, spec, spec_set = self.new, self.spec, self.spec_set
1437 autospec, kwargs = self.autospec, self.kwargs
1438 new_callable = self.new_callable
1439 self.target = self.getter()
1440
1441 # normalise False to None
1442 if spec is False:
1443 spec = None
1444 if spec_set is False:
1445 spec_set = None
1446 if autospec is False:
1447 autospec = None
1448
1449 if spec is not None and autospec is not None:
1450 raise TypeError("Can't specify spec and autospec")
1451 if ((spec is not None or autospec is not None) and
1452 spec_set not in (True, None)):
1453 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1454
1455 original, local = self.get_original()
1456
1457 if new is DEFAULT and autospec is None:
1458 inherit = False
1459 if spec is True:
1460 # set spec to the object we are replacing
1461 spec = original
1462 if spec_set is True:
1463 spec_set = original
1464 spec = None
1465 elif spec is not None:
1466 if spec_set is True:
1467 spec_set = spec
1468 spec = None
1469 elif spec_set is True:
1470 spec_set = original
1471
1472 if spec is not None or spec_set is not None:
1473 if original is DEFAULT:
1474 raise TypeError("Can't use 'spec' with create=True")
1475 if isinstance(original, type):
1476 # If we're patching out a class and there is a spec
1477 inherit = True
1478 if spec is None and _is_async_obj(original):
1479 Klass = AsyncMock
1480 else:
1481 Klass = MagicMock
1482 _kwargs = {}
1483 if new_callable is not None:
1484 Klass = new_callable
1485 elif spec is not None or spec_set is not None:
1486 this_spec = spec
1487 if spec_set is not None:
1488 this_spec = spec_set
1489 if _is_list(this_spec):
1490 not_callable = '__call__' not in this_spec
1491 else:
1492 not_callable = not callable(this_spec)
1493 if _is_async_obj(this_spec):
1494 Klass = AsyncMock
1495 elif not_callable:
1496 Klass = NonCallableMagicMock
1497
1498 if spec is not None:
1499 _kwargs['spec'] = spec
1500 if spec_set is not None:
1501 _kwargs['spec_set'] = spec_set
1502
1503 # add a name to mocks
1504 if (isinstance(Klass, type) and
1505 issubclass(Klass, NonCallableMock) and self.attribute):
1506 _kwargs['name'] = self.attribute
1507
1508 _kwargs.update(kwargs)
1509 new = Klass(**_kwargs)
1510
1511 if inherit and _is_instance_mock(new):
1512 # we can only tell if the instance should be callable if the
1513 # spec is not a list
1514 this_spec = spec
1515 if spec_set is not None:
1516 this_spec = spec_set
1517 if (not _is_list(this_spec) and not
1518 _instance_callable(this_spec)):
1519 Klass = NonCallableMagicMock
1520
1521 _kwargs.pop('name')
1522 new.return_value = Klass(_new_parent=new, _new_name='()',
1523 **_kwargs)
1524 elif autospec is not None:
1525 # spec is ignored, new *must* be default, spec_set is treated
1526 # as a boolean. Should we check spec is not None and that spec_set
1527 # is a bool?
1528 if new is not DEFAULT:
1529 raise TypeError(
1530 "autospec creates the mock for you. Can't specify "
1531 "autospec and new."
1532 )
1533 if original is DEFAULT:
1534 raise TypeError("Can't use 'autospec' with create=True")
1535 spec_set = bool(spec_set)
1536 if autospec is True:
1537 autospec = original
1538
1539 if _is_instance_mock(self.target):
1540 raise InvalidSpecError(
1541 f'Cannot autospec attr {self.attribute!r} as the patch '
1542 f'target has already been mocked out. '
1543 f'[target={self.target!r}, attr={autospec!r}]')
1544 if _is_instance_mock(autospec):
1545 target_name = getattr(self.target, '__name__', self.target)
1546 raise InvalidSpecError(
1547 f'Cannot autospec attr {self.attribute!r} from target '
1548 f'{target_name!r} as it has already been mocked out. '
1549 f'[target={self.target!r}, attr={autospec!r}]')
1550
1551 new = create_autospec(autospec, spec_set=spec_set,
1552 _name=self.attribute, **kwargs)
1553 elif kwargs:
1554 # can't set keyword args when we aren't creating the mock
1555 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1556 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1557
1558 new_attr = new
1559
1560 self.temp_original = original
1561 self.is_local = local
1562 self._exit_stack = contextlib.ExitStack()
1563 try:
1564 setattr(self.target, self.attribute, new_attr)
1565 if self.attribute_name is not None:
1566 extra_args = {}
1567 if self.new is DEFAULT:
1568 extra_args[self.attribute_name] = new
1569 for patching in self.additional_patchers:
1570 arg = self._exit_stack.enter_context(patching)
1571 if patching.new is DEFAULT:
1572 extra_args.update(arg)
1573 return extra_args
1574
1575 return new
1576 except:
1577 if not self.__exit__(*sys.exc_info()):
1578 raise
1579
1580 def __exit__(self, *exc_info):
1581 """Undo the patch."""
1582 if self.is_local and self.temp_original is not DEFAULT:
1583 setattr(self.target, self.attribute, self.temp_original)
1584 else:
1585 delattr(self.target, self.attribute)
1586 if not self.create and (not hasattr(self.target, self.attribute) or
1587 self.attribute in ('__doc__', '__module__',
1588 '__defaults__', '__annotations__',
1589 '__kwdefaults__')):
1590 # needed for proxy objects like django settings
1591 setattr(self.target, self.attribute, self.temp_original)
1592
1593 del self.temp_original
1594 del self.is_local
1595 del self.target
1596 exit_stack = self._exit_stack
1597 del self._exit_stack
1598 return exit_stack.__exit__(*exc_info)
1599
1600
1601 def start(self):
1602 """Activate a patch, returning any created mock."""
1603 result = self.__enter__()
1604 self._active_patches.append(self)
1605 return result
1606
1607
1608 def stop(self):
1609 """Stop an active patch."""
1610 try:
1611 self._active_patches.remove(self)
1612 except ValueError:
1613 # If the patch hasn't been started this will fail
1614 return None
1615
1616 return self.__exit__(None, None, None)
1617
1618
1619
1620 def _get_target(target):
1621 try:
1622 target, attribute = target.rsplit('.', 1)
1623 except (TypeError, ValueError, AttributeError):
1624 raise TypeError(
1625 f"Need a valid target to patch. You supplied: {target!r}")
1626 return partial(pkgutil.resolve_name, target), attribute
1627
1628
1629 def _patch_object(
1630 target, attribute, new=DEFAULT, spec=None,
1631 create=False, spec_set=None, autospec=None,
1632 new_callable=None, *, unsafe=False, **kwargs
1633 ):
1634 """
1635 patch the named member (`attribute`) on an object (`target`) with a mock
1636 object.
1637
1638 `patch.object` can be used as a decorator, class decorator or a context
1639 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1640 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1641 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1642 the mock object it creates.
1643
1644 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1645 for choosing which methods to wrap.
1646 """
1647 if type(target) is str:
1648 raise TypeError(
1649 f"{target!r} must be the actual object to be patched, not a str"
1650 )
1651 getter = lambda: target
1652 return _patch(
1653 getter, attribute, new, spec, create,
1654 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1655 )
1656
1657
1658 def _patch_multiple(target, spec=None, create=False, spec_set=None,
1659 autospec=None, new_callable=None, **kwargs):
1660 """Perform multiple patches in a single call. It takes the object to be
1661 patched (either as an object or a string to fetch the object by importing)
1662 and keyword arguments for the patches::
1663
1664 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1665 ...
1666
1667 Use `DEFAULT` as the value if you want `patch.multiple` to create
1668 mocks for you. In this case the created mocks are passed into a decorated
1669 function by keyword, and a dictionary is returned when `patch.multiple` is
1670 used as a context manager.
1671
1672 `patch.multiple` can be used as a decorator, class decorator or a context
1673 manager. The arguments `spec`, `spec_set`, `create`,
1674 `autospec` and `new_callable` have the same meaning as for `patch`. These
1675 arguments will be applied to *all* patches done by `patch.multiple`.
1676
1677 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1678 for choosing which methods to wrap.
1679 """
1680 if type(target) is str:
1681 getter = partial(pkgutil.resolve_name, target)
1682 else:
1683 getter = lambda: target
1684
1685 if not kwargs:
1686 raise ValueError(
1687 'Must supply at least one keyword argument with patch.multiple'
1688 )
1689 # need to wrap in a list for python 3, where items is a view
1690 items = list(kwargs.items())
1691 attribute, new = items[0]
1692 patcher = _patch(
1693 getter, attribute, new, spec, create, spec_set,
1694 autospec, new_callable, {}
1695 )
1696 patcher.attribute_name = attribute
1697 for attribute, new in items[1:]:
1698 this_patcher = _patch(
1699 getter, attribute, new, spec, create, spec_set,
1700 autospec, new_callable, {}
1701 )
1702 this_patcher.attribute_name = attribute
1703 patcher.additional_patchers.append(this_patcher)
1704 return patcher
1705
1706
1707 def patch(
1708 target, new=DEFAULT, spec=None, create=False,
1709 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1710 ):
1711 """
1712 `patch` acts as a function decorator, class decorator or a context
1713 manager. Inside the body of the function or with statement, the `target`
1714 is patched with a `new` object. When the function/with statement exits
1715 the patch is undone.
1716
1717 If `new` is omitted, then the target is replaced with an
1718 `AsyncMock if the patched object is an async function or a
1719 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1720 omitted, the created mock is passed in as an extra argument to the
1721 decorated function. If `patch` is used as a context manager the created
1722 mock is returned by the context manager.
1723
1724 `target` should be a string in the form `'package.module.ClassName'`. The
1725 `target` is imported and the specified object replaced with the `new`
1726 object, so the `target` must be importable from the environment you are
1727 calling `patch` from. The target is imported when the decorated function
1728 is executed, not at decoration time.
1729
1730 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1731 if patch is creating one for you.
1732
1733 In addition you can pass `spec=True` or `spec_set=True`, which causes
1734 patch to pass in the object being mocked as the spec/spec_set object.
1735
1736 `new_callable` allows you to specify a different class, or callable object,
1737 that will be called to create the `new` object. By default `AsyncMock` is
1738 used for async functions and `MagicMock` for the rest.
1739
1740 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1741 then the mock will be created with a spec from the object being replaced.
1742 All attributes of the mock will also have the spec of the corresponding
1743 attribute of the object being replaced. Methods and functions being
1744 mocked will have their arguments checked and will raise a `TypeError` if
1745 they are called with the wrong signature. For mocks replacing a class,
1746 their return value (the 'instance') will have the same spec as the class.
1747
1748 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1749 arbitrary object as the spec instead of the one being replaced.
1750
1751 By default `patch` will fail to replace attributes that don't exist. If
1752 you pass in `create=True`, and the attribute doesn't exist, patch will
1753 create the attribute for you when the patched function is called, and
1754 delete it again afterwards. This is useful for writing tests against
1755 attributes that your production code creates at runtime. It is off by
1756 default because it can be dangerous. With it switched on you can write
1757 passing tests against APIs that don't actually exist!
1758
1759 Patch can be used as a `TestCase` class decorator. It works by
1760 decorating each test method in the class. This reduces the boilerplate
1761 code when your test methods share a common patchings set. `patch` finds
1762 tests by looking for method names that start with `patch.TEST_PREFIX`.
1763 By default this is `test`, which matches the way `unittest` finds tests.
1764 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1765
1766 Patch can be used as a context manager, with the with statement. Here the
1767 patching applies to the indented block after the with statement. If you
1768 use "as" then the patched object will be bound to the name after the
1769 "as"; very useful if `patch` is creating a mock object for you.
1770
1771 Patch will raise a `RuntimeError` if passed some common misspellings of
1772 the arguments autospec and spec_set. Pass the argument `unsafe` with the
1773 value True to disable that check.
1774
1775 `patch` takes arbitrary keyword arguments. These will be passed to
1776 `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1777 otherwise or to `new_callable` if specified.
1778
1779 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1780 available for alternate use-cases.
1781 """
1782 getter, attribute = _get_target(target)
1783 return _patch(
1784 getter, attribute, new, spec, create,
1785 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1786 )
1787
1788
1789 class ESC[4;38;5;81m_patch_dict(ESC[4;38;5;149mobject):
1790 """
1791 Patch a dictionary, or dictionary like object, and restore the dictionary
1792 to its original state after the test.
1793
1794 `in_dict` can be a dictionary or a mapping like container. If it is a
1795 mapping then it must at least support getting, setting and deleting items
1796 plus iterating over keys.
1797
1798 `in_dict` can also be a string specifying the name of the dictionary, which
1799 will then be fetched by importing it.
1800
1801 `values` can be a dictionary of values to set in the dictionary. `values`
1802 can also be an iterable of `(key, value)` pairs.
1803
1804 If `clear` is True then the dictionary will be cleared before the new
1805 values are set.
1806
1807 `patch.dict` can also be called with arbitrary keyword arguments to set
1808 values in the dictionary::
1809
1810 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1811 ...
1812
1813 `patch.dict` can be used as a context manager, decorator or class
1814 decorator. When used as a class decorator `patch.dict` honours
1815 `patch.TEST_PREFIX` for choosing which methods to wrap.
1816 """
1817
1818 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1819 self.in_dict = in_dict
1820 # support any argument supported by dict(...) constructor
1821 self.values = dict(values)
1822 self.values.update(kwargs)
1823 self.clear = clear
1824 self._original = None
1825
1826
1827 def __call__(self, f):
1828 if isinstance(f, type):
1829 return self.decorate_class(f)
1830 if inspect.iscoroutinefunction(f):
1831 return self.decorate_async_callable(f)
1832 return self.decorate_callable(f)
1833
1834
1835 def decorate_callable(self, f):
1836 @wraps(f)
1837 def _inner(*args, **kw):
1838 self._patch_dict()
1839 try:
1840 return f(*args, **kw)
1841 finally:
1842 self._unpatch_dict()
1843
1844 return _inner
1845
1846
1847 def decorate_async_callable(self, f):
1848 @wraps(f)
1849 async def _inner(*args, **kw):
1850 self._patch_dict()
1851 try:
1852 return await f(*args, **kw)
1853 finally:
1854 self._unpatch_dict()
1855
1856 return _inner
1857
1858
1859 def decorate_class(self, klass):
1860 for attr in dir(klass):
1861 attr_value = getattr(klass, attr)
1862 if (attr.startswith(patch.TEST_PREFIX) and
1863 hasattr(attr_value, "__call__")):
1864 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1865 decorated = decorator(attr_value)
1866 setattr(klass, attr, decorated)
1867 return klass
1868
1869
1870 def __enter__(self):
1871 """Patch the dict."""
1872 self._patch_dict()
1873 return self.in_dict
1874
1875
1876 def _patch_dict(self):
1877 values = self.values
1878 if isinstance(self.in_dict, str):
1879 self.in_dict = pkgutil.resolve_name(self.in_dict)
1880 in_dict = self.in_dict
1881 clear = self.clear
1882
1883 try:
1884 original = in_dict.copy()
1885 except AttributeError:
1886 # dict like object with no copy method
1887 # must support iteration over keys
1888 original = {}
1889 for key in in_dict:
1890 original[key] = in_dict[key]
1891 self._original = original
1892
1893 if clear:
1894 _clear_dict(in_dict)
1895
1896 try:
1897 in_dict.update(values)
1898 except AttributeError:
1899 # dict like object with no update method
1900 for key in values:
1901 in_dict[key] = values[key]
1902
1903
1904 def _unpatch_dict(self):
1905 in_dict = self.in_dict
1906 original = self._original
1907
1908 _clear_dict(in_dict)
1909
1910 try:
1911 in_dict.update(original)
1912 except AttributeError:
1913 for key in original:
1914 in_dict[key] = original[key]
1915
1916
1917 def __exit__(self, *args):
1918 """Unpatch the dict."""
1919 if self._original is not None:
1920 self._unpatch_dict()
1921 return False
1922
1923
1924 def start(self):
1925 """Activate a patch, returning any created mock."""
1926 result = self.__enter__()
1927 _patch._active_patches.append(self)
1928 return result
1929
1930
1931 def stop(self):
1932 """Stop an active patch."""
1933 try:
1934 _patch._active_patches.remove(self)
1935 except ValueError:
1936 # If the patch hasn't been started this will fail
1937 return None
1938
1939 return self.__exit__(None, None, None)
1940
1941
1942 def _clear_dict(in_dict):
1943 try:
1944 in_dict.clear()
1945 except AttributeError:
1946 keys = list(in_dict)
1947 for key in keys:
1948 del in_dict[key]
1949
1950
1951 def _patch_stopall():
1952 """Stop all active patches. LIFO to unroll nested patches."""
1953 for patch in reversed(_patch._active_patches):
1954 patch.stop()
1955
1956
1957 patch.object = _patch_object
1958 patch.dict = _patch_dict
1959 patch.multiple = _patch_multiple
1960 patch.stopall = _patch_stopall
1961 patch.TEST_PREFIX = 'test'
1962
1963 magic_methods = (
1964 "lt le gt ge eq ne "
1965 "getitem setitem delitem "
1966 "len contains iter "
1967 "hash str sizeof "
1968 "enter exit "
1969 # we added divmod and rdivmod here instead of numerics
1970 # because there is no idivmod
1971 "divmod rdivmod neg pos abs invert "
1972 "complex int float index "
1973 "round trunc floor ceil "
1974 "bool next "
1975 "fspath "
1976 "aiter "
1977 )
1978
1979 numerics = (
1980 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow"
1981 )
1982 inplace = ' '.join('i%s' % n for n in numerics.split())
1983 right = ' '.join('r%s' % n for n in numerics.split())
1984
1985 # not including __prepare__, __instancecheck__, __subclasscheck__
1986 # (as they are metaclass methods)
1987 # __del__ is not supported at all as it causes problems if it exists
1988
1989 _non_defaults = {
1990 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
1991 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
1992 '__getstate__', '__setstate__', '__getformat__',
1993 '__repr__', '__dir__', '__subclasses__', '__format__',
1994 '__getnewargs_ex__',
1995 }
1996
1997
1998 def _get_method(name, func):
1999 "Turns a callable object (like a mock) into a real function"
2000 def method(self, /, *args, **kw):
2001 return func(self, *args, **kw)
2002 method.__name__ = name
2003 return method
2004
2005
2006 _magics = {
2007 '__%s__' % method for method in
2008 ' '.join([magic_methods, numerics, inplace, right]).split()
2009 }
2010
2011 # Magic methods used for async `with` statements
2012 _async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
2013 # Magic methods that are only used with async calls but are synchronous functions themselves
2014 _sync_async_magics = {"__aiter__"}
2015 _async_magics = _async_method_magics | _sync_async_magics
2016
2017 _all_sync_magics = _magics | _non_defaults
2018 _all_magics = _all_sync_magics | _async_magics
2019
2020 _unsupported_magics = {
2021 '__getattr__', '__setattr__',
2022 '__init__', '__new__', '__prepare__',
2023 '__instancecheck__', '__subclasscheck__',
2024 '__del__'
2025 }
2026
2027 _calculate_return_value = {
2028 '__hash__': lambda self: object.__hash__(self),
2029 '__str__': lambda self: object.__str__(self),
2030 '__sizeof__': lambda self: object.__sizeof__(self),
2031 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
2032 }
2033
2034 _return_values = {
2035 '__lt__': NotImplemented,
2036 '__gt__': NotImplemented,
2037 '__le__': NotImplemented,
2038 '__ge__': NotImplemented,
2039 '__int__': 1,
2040 '__contains__': False,
2041 '__len__': 0,
2042 '__exit__': False,
2043 '__complex__': 1j,
2044 '__float__': 1.0,
2045 '__bool__': True,
2046 '__index__': 1,
2047 '__aexit__': False,
2048 }
2049
2050
2051 def _get_eq(self):
2052 def __eq__(other):
2053 ret_val = self.__eq__._mock_return_value
2054 if ret_val is not DEFAULT:
2055 return ret_val
2056 if self is other:
2057 return True
2058 return NotImplemented
2059 return __eq__
2060
2061 def _get_ne(self):
2062 def __ne__(other):
2063 if self.__ne__._mock_return_value is not DEFAULT:
2064 return DEFAULT
2065 if self is other:
2066 return False
2067 return NotImplemented
2068 return __ne__
2069
2070 def _get_iter(self):
2071 def __iter__():
2072 ret_val = self.__iter__._mock_return_value
2073 if ret_val is DEFAULT:
2074 return iter([])
2075 # if ret_val was already an iterator, then calling iter on it should
2076 # return the iterator unchanged
2077 return iter(ret_val)
2078 return __iter__
2079
2080 def _get_async_iter(self):
2081 def __aiter__():
2082 ret_val = self.__aiter__._mock_return_value
2083 if ret_val is DEFAULT:
2084 return _AsyncIterator(iter([]))
2085 return _AsyncIterator(iter(ret_val))
2086 return __aiter__
2087
2088 _side_effect_methods = {
2089 '__eq__': _get_eq,
2090 '__ne__': _get_ne,
2091 '__iter__': _get_iter,
2092 '__aiter__': _get_async_iter
2093 }
2094
2095
2096
2097 def _set_return_value(mock, method, name):
2098 fixed = _return_values.get(name, DEFAULT)
2099 if fixed is not DEFAULT:
2100 method.return_value = fixed
2101 return
2102
2103 return_calculator = _calculate_return_value.get(name)
2104 if return_calculator is not None:
2105 return_value = return_calculator(mock)
2106 method.return_value = return_value
2107 return
2108
2109 side_effector = _side_effect_methods.get(name)
2110 if side_effector is not None:
2111 method.side_effect = side_effector(mock)
2112
2113
2114
2115 class ESC[4;38;5;81mMagicMixin(ESC[4;38;5;149mBase):
2116 def __init__(self, /, *args, **kw):
2117 self._mock_set_magics() # make magic work for kwargs in init
2118 _safe_super(MagicMixin, self).__init__(*args, **kw)
2119 self._mock_set_magics() # fix magic broken by upper level init
2120
2121
2122 def _mock_set_magics(self):
2123 orig_magics = _magics | _async_method_magics
2124 these_magics = orig_magics
2125
2126 if getattr(self, "_mock_methods", None) is not None:
2127 these_magics = orig_magics.intersection(self._mock_methods)
2128
2129 remove_magics = set()
2130 remove_magics = orig_magics - these_magics
2131
2132 for entry in remove_magics:
2133 if entry in type(self).__dict__:
2134 # remove unneeded magic methods
2135 delattr(self, entry)
2136
2137 # don't overwrite existing attributes if called a second time
2138 these_magics = these_magics - set(type(self).__dict__)
2139
2140 _type = type(self)
2141 for entry in these_magics:
2142 setattr(_type, entry, MagicProxy(entry, self))
2143
2144
2145
2146 class ESC[4;38;5;81mNonCallableMagicMock(ESC[4;38;5;149mMagicMixin, ESC[4;38;5;149mNonCallableMock):
2147 """A version of `MagicMock` that isn't callable."""
2148 def mock_add_spec(self, spec, spec_set=False):
2149 """Add a spec to a mock. `spec` can either be an object or a
2150 list of strings. Only attributes on the `spec` can be fetched as
2151 attributes from the mock.
2152
2153 If `spec_set` is True then only attributes on the spec can be set."""
2154 self._mock_add_spec(spec, spec_set)
2155 self._mock_set_magics()
2156
2157
2158 class ESC[4;38;5;81mAsyncMagicMixin(ESC[4;38;5;149mMagicMixin):
2159 pass
2160
2161
2162 class ESC[4;38;5;81mMagicMock(ESC[4;38;5;149mMagicMixin, ESC[4;38;5;149mMock):
2163 """
2164 MagicMock is a subclass of Mock with default implementations
2165 of most of the magic methods. You can use MagicMock without having to
2166 configure the magic methods yourself.
2167
2168 If you use the `spec` or `spec_set` arguments then *only* magic
2169 methods that exist in the spec will be created.
2170
2171 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2172 """
2173 def mock_add_spec(self, spec, spec_set=False):
2174 """Add a spec to a mock. `spec` can either be an object or a
2175 list of strings. Only attributes on the `spec` can be fetched as
2176 attributes from the mock.
2177
2178 If `spec_set` is True then only attributes on the spec can be set."""
2179 self._mock_add_spec(spec, spec_set)
2180 self._mock_set_magics()
2181
2182
2183
2184 class ESC[4;38;5;81mMagicProxy(ESC[4;38;5;149mBase):
2185 def __init__(self, name, parent):
2186 self.name = name
2187 self.parent = parent
2188
2189 def create_mock(self):
2190 entry = self.name
2191 parent = self.parent
2192 m = parent._get_child_mock(name=entry, _new_name=entry,
2193 _new_parent=parent)
2194 setattr(parent, entry, m)
2195 _set_return_value(parent, m, entry)
2196 return m
2197
2198 def __get__(self, obj, _type=None):
2199 return self.create_mock()
2200
2201
2202 _CODE_ATTRS = dir(CodeType)
2203 _CODE_SIG = inspect.signature(partial(CodeType.__init__, None))
2204
2205
2206 class ESC[4;38;5;81mAsyncMockMixin(ESC[4;38;5;149mBase):
2207 await_count = _delegating_property('await_count')
2208 await_args = _delegating_property('await_args')
2209 await_args_list = _delegating_property('await_args_list')
2210
2211 def __init__(self, /, *args, **kwargs):
2212 super().__init__(*args, **kwargs)
2213 # iscoroutinefunction() checks _is_coroutine property to say if an
2214 # object is a coroutine. Without this check it looks to see if it is a
2215 # function/method, which in this case it is not (since it is an
2216 # AsyncMock).
2217 # It is set through __dict__ because when spec_set is True, this
2218 # attribute is likely undefined.
2219 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2220 self.__dict__['_mock_await_count'] = 0
2221 self.__dict__['_mock_await_args'] = None
2222 self.__dict__['_mock_await_args_list'] = _CallList()
2223 code_mock = NonCallableMock(spec_set=_CODE_ATTRS)
2224 code_mock.__dict__["_spec_class"] = CodeType
2225 code_mock.__dict__["_spec_signature"] = _CODE_SIG
2226 code_mock.co_flags = (
2227 inspect.CO_COROUTINE
2228 + inspect.CO_VARARGS
2229 + inspect.CO_VARKEYWORDS
2230 )
2231 code_mock.co_argcount = 0
2232 code_mock.co_varnames = ('args', 'kwargs')
2233 code_mock.co_posonlyargcount = 0
2234 code_mock.co_kwonlyargcount = 0
2235 self.__dict__['__code__'] = code_mock
2236 self.__dict__['__name__'] = 'AsyncMock'
2237 self.__dict__['__defaults__'] = tuple()
2238 self.__dict__['__kwdefaults__'] = {}
2239 self.__dict__['__annotations__'] = None
2240
2241 async def _execute_mock_call(self, /, *args, **kwargs):
2242 # This is nearly just like super(), except for special handling
2243 # of coroutines
2244
2245 _call = _Call((args, kwargs), two=True)
2246 self.await_count += 1
2247 self.await_args = _call
2248 self.await_args_list.append(_call)
2249
2250 effect = self.side_effect
2251 if effect is not None:
2252 if _is_exception(effect):
2253 raise effect
2254 elif not _callable(effect):
2255 try:
2256 result = next(effect)
2257 except StopIteration:
2258 # It is impossible to propagate a StopIteration
2259 # through coroutines because of PEP 479
2260 raise StopAsyncIteration
2261 if _is_exception(result):
2262 raise result
2263 elif iscoroutinefunction(effect):
2264 result = await effect(*args, **kwargs)
2265 else:
2266 result = effect(*args, **kwargs)
2267
2268 if result is not DEFAULT:
2269 return result
2270
2271 if self._mock_return_value is not DEFAULT:
2272 return self.return_value
2273
2274 if self._mock_wraps is not None:
2275 if iscoroutinefunction(self._mock_wraps):
2276 return await self._mock_wraps(*args, **kwargs)
2277 return self._mock_wraps(*args, **kwargs)
2278
2279 return self.return_value
2280
2281 def assert_awaited(self):
2282 """
2283 Assert that the mock was awaited at least once.
2284 """
2285 if self.await_count == 0:
2286 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2287 raise AssertionError(msg)
2288
2289 def assert_awaited_once(self):
2290 """
2291 Assert that the mock was awaited exactly once.
2292 """
2293 if not self.await_count == 1:
2294 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2295 f" Awaited {self.await_count} times.")
2296 raise AssertionError(msg)
2297
2298 def assert_awaited_with(self, /, *args, **kwargs):
2299 """
2300 Assert that the last await was with the specified arguments.
2301 """
2302 if self.await_args is None:
2303 expected = self._format_mock_call_signature(args, kwargs)
2304 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2305
2306 def _error_message():
2307 msg = self._format_mock_failure_message(args, kwargs, action='await')
2308 return msg
2309
2310 expected = self._call_matcher(_Call((args, kwargs), two=True))
2311 actual = self._call_matcher(self.await_args)
2312 if actual != expected:
2313 cause = expected if isinstance(expected, Exception) else None
2314 raise AssertionError(_error_message()) from cause
2315
2316 def assert_awaited_once_with(self, /, *args, **kwargs):
2317 """
2318 Assert that the mock was awaited exactly once and with the specified
2319 arguments.
2320 """
2321 if not self.await_count == 1:
2322 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2323 f" Awaited {self.await_count} times.")
2324 raise AssertionError(msg)
2325 return self.assert_awaited_with(*args, **kwargs)
2326
2327 def assert_any_await(self, /, *args, **kwargs):
2328 """
2329 Assert the mock has ever been awaited with the specified arguments.
2330 """
2331 expected = self._call_matcher(_Call((args, kwargs), two=True))
2332 cause = expected if isinstance(expected, Exception) else None
2333 actual = [self._call_matcher(c) for c in self.await_args_list]
2334 if cause or expected not in _AnyComparer(actual):
2335 expected_string = self._format_mock_call_signature(args, kwargs)
2336 raise AssertionError(
2337 '%s await not found' % expected_string
2338 ) from cause
2339
2340 def assert_has_awaits(self, calls, any_order=False):
2341 """
2342 Assert the mock has been awaited with the specified calls.
2343 The :attr:`await_args_list` list is checked for the awaits.
2344
2345 If `any_order` is False (the default) then the awaits must be
2346 sequential. There can be extra calls before or after the
2347 specified awaits.
2348
2349 If `any_order` is True then the awaits can be in any order, but
2350 they must all appear in :attr:`await_args_list`.
2351 """
2352 expected = [self._call_matcher(c) for c in calls]
2353 cause = next((e for e in expected if isinstance(e, Exception)), None)
2354 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2355 if not any_order:
2356 if expected not in all_awaits:
2357 if cause is None:
2358 problem = 'Awaits not found.'
2359 else:
2360 problem = ('Error processing expected awaits.\n'
2361 'Errors: {}').format(
2362 [e if isinstance(e, Exception) else None
2363 for e in expected])
2364 raise AssertionError(
2365 f'{problem}\n'
2366 f'Expected: {_CallList(calls)}\n'
2367 f'Actual: {self.await_args_list}'
2368 ) from cause
2369 return
2370
2371 all_awaits = list(all_awaits)
2372
2373 not_found = []
2374 for kall in expected:
2375 try:
2376 all_awaits.remove(kall)
2377 except ValueError:
2378 not_found.append(kall)
2379 if not_found:
2380 raise AssertionError(
2381 '%r not all found in await list' % (tuple(not_found),)
2382 ) from cause
2383
2384 def assert_not_awaited(self):
2385 """
2386 Assert that the mock was never awaited.
2387 """
2388 if self.await_count != 0:
2389 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2390 f" Awaited {self.await_count} times.")
2391 raise AssertionError(msg)
2392
2393 def reset_mock(self, /, *args, **kwargs):
2394 """
2395 See :func:`.Mock.reset_mock()`
2396 """
2397 super().reset_mock(*args, **kwargs)
2398 self.await_count = 0
2399 self.await_args = None
2400 self.await_args_list = _CallList()
2401
2402
2403 class ESC[4;38;5;81mAsyncMock(ESC[4;38;5;149mAsyncMockMixin, ESC[4;38;5;149mAsyncMagicMixin, ESC[4;38;5;149mMock):
2404 """
2405 Enhance :class:`Mock` with features allowing to mock
2406 an async function.
2407
2408 The :class:`AsyncMock` object will behave so the object is
2409 recognized as an async function, and the result of a call is an awaitable:
2410
2411 >>> mock = AsyncMock()
2412 >>> iscoroutinefunction(mock)
2413 True
2414 >>> inspect.isawaitable(mock())
2415 True
2416
2417
2418 The result of ``mock()`` is an async function which will have the outcome
2419 of ``side_effect`` or ``return_value``:
2420
2421 - if ``side_effect`` is a function, the async function will return the
2422 result of that function,
2423 - if ``side_effect`` is an exception, the async function will raise the
2424 exception,
2425 - if ``side_effect`` is an iterable, the async function will return the
2426 next value of the iterable, however, if the sequence of result is
2427 exhausted, ``StopIteration`` is raised immediately,
2428 - if ``side_effect`` is not defined, the async function will return the
2429 value defined by ``return_value``, hence, by default, the async function
2430 returns a new :class:`AsyncMock` object.
2431
2432 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2433 the mock async function obtained when the mock object is called will be this
2434 async function itself (and not an async function returning an async
2435 function).
2436
2437 The test author can also specify a wrapped object with ``wraps``. In this
2438 case, the :class:`Mock` object behavior is the same as with an
2439 :class:`.Mock` object: the wrapped object may have methods
2440 defined as async function functions.
2441
2442 Based on Martin Richard's asynctest project.
2443 """
2444
2445
2446 class ESC[4;38;5;81m_ANY(ESC[4;38;5;149mobject):
2447 "A helper object that compares equal to everything."
2448
2449 def __eq__(self, other):
2450 return True
2451
2452 def __ne__(self, other):
2453 return False
2454
2455 def __repr__(self):
2456 return '<ANY>'
2457
2458 ANY = _ANY()
2459
2460
2461
2462 def _format_call_signature(name, args, kwargs):
2463 message = '%s(%%s)' % name
2464 formatted_args = ''
2465 args_string = ', '.join([repr(arg) for arg in args])
2466 kwargs_string = ', '.join([
2467 '%s=%r' % (key, value) for key, value in kwargs.items()
2468 ])
2469 if args_string:
2470 formatted_args = args_string
2471 if kwargs_string:
2472 if formatted_args:
2473 formatted_args += ', '
2474 formatted_args += kwargs_string
2475
2476 return message % formatted_args
2477
2478
2479
2480 class ESC[4;38;5;81m_Call(ESC[4;38;5;149mtuple):
2481 """
2482 A tuple for holding the results of a call to a mock, either in the form
2483 `(args, kwargs)` or `(name, args, kwargs)`.
2484
2485 If args or kwargs are empty then a call tuple will compare equal to
2486 a tuple without those values. This makes comparisons less verbose::
2487
2488 _Call(('name', (), {})) == ('name',)
2489 _Call(('name', (1,), {})) == ('name', (1,))
2490 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2491
2492 The `_Call` object provides a useful shortcut for comparing with call::
2493
2494 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2495 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2496
2497 If the _Call has no name then it will match any name.
2498 """
2499 def __new__(cls, value=(), name='', parent=None, two=False,
2500 from_kall=True):
2501 args = ()
2502 kwargs = {}
2503 _len = len(value)
2504 if _len == 3:
2505 name, args, kwargs = value
2506 elif _len == 2:
2507 first, second = value
2508 if isinstance(first, str):
2509 name = first
2510 if isinstance(second, tuple):
2511 args = second
2512 else:
2513 kwargs = second
2514 else:
2515 args, kwargs = first, second
2516 elif _len == 1:
2517 value, = value
2518 if isinstance(value, str):
2519 name = value
2520 elif isinstance(value, tuple):
2521 args = value
2522 else:
2523 kwargs = value
2524
2525 if two:
2526 return tuple.__new__(cls, (args, kwargs))
2527
2528 return tuple.__new__(cls, (name, args, kwargs))
2529
2530
2531 def __init__(self, value=(), name=None, parent=None, two=False,
2532 from_kall=True):
2533 self._mock_name = name
2534 self._mock_parent = parent
2535 self._mock_from_kall = from_kall
2536
2537
2538 def __eq__(self, other):
2539 try:
2540 len_other = len(other)
2541 except TypeError:
2542 return NotImplemented
2543
2544 self_name = ''
2545 if len(self) == 2:
2546 self_args, self_kwargs = self
2547 else:
2548 self_name, self_args, self_kwargs = self
2549
2550 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2551 and self._mock_parent != other._mock_parent):
2552 return False
2553
2554 other_name = ''
2555 if len_other == 0:
2556 other_args, other_kwargs = (), {}
2557 elif len_other == 3:
2558 other_name, other_args, other_kwargs = other
2559 elif len_other == 1:
2560 value, = other
2561 if isinstance(value, tuple):
2562 other_args = value
2563 other_kwargs = {}
2564 elif isinstance(value, str):
2565 other_name = value
2566 other_args, other_kwargs = (), {}
2567 else:
2568 other_args = ()
2569 other_kwargs = value
2570 elif len_other == 2:
2571 # could be (name, args) or (name, kwargs) or (args, kwargs)
2572 first, second = other
2573 if isinstance(first, str):
2574 other_name = first
2575 if isinstance(second, tuple):
2576 other_args, other_kwargs = second, {}
2577 else:
2578 other_args, other_kwargs = (), second
2579 else:
2580 other_args, other_kwargs = first, second
2581 else:
2582 return False
2583
2584 if self_name and other_name != self_name:
2585 return False
2586
2587 # this order is important for ANY to work!
2588 return (other_args, other_kwargs) == (self_args, self_kwargs)
2589
2590
2591 __ne__ = object.__ne__
2592
2593
2594 def __call__(self, /, *args, **kwargs):
2595 if self._mock_name is None:
2596 return _Call(('', args, kwargs), name='()')
2597
2598 name = self._mock_name + '()'
2599 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2600
2601
2602 def __getattr__(self, attr):
2603 if self._mock_name is None:
2604 return _Call(name=attr, from_kall=False)
2605 name = '%s.%s' % (self._mock_name, attr)
2606 return _Call(name=name, parent=self, from_kall=False)
2607
2608
2609 def __getattribute__(self, attr):
2610 if attr in tuple.__dict__:
2611 raise AttributeError
2612 return tuple.__getattribute__(self, attr)
2613
2614
2615 def _get_call_arguments(self):
2616 if len(self) == 2:
2617 args, kwargs = self
2618 else:
2619 name, args, kwargs = self
2620
2621 return args, kwargs
2622
2623 @property
2624 def args(self):
2625 return self._get_call_arguments()[0]
2626
2627 @property
2628 def kwargs(self):
2629 return self._get_call_arguments()[1]
2630
2631 def __repr__(self):
2632 if not self._mock_from_kall:
2633 name = self._mock_name or 'call'
2634 if name.startswith('()'):
2635 name = 'call%s' % name
2636 return name
2637
2638 if len(self) == 2:
2639 name = 'call'
2640 args, kwargs = self
2641 else:
2642 name, args, kwargs = self
2643 if not name:
2644 name = 'call'
2645 elif not name.startswith('()'):
2646 name = 'call.%s' % name
2647 else:
2648 name = 'call%s' % name
2649 return _format_call_signature(name, args, kwargs)
2650
2651
2652 def call_list(self):
2653 """For a call object that represents multiple calls, `call_list`
2654 returns a list of all the intermediate calls as well as the
2655 final call."""
2656 vals = []
2657 thing = self
2658 while thing is not None:
2659 if thing._mock_from_kall:
2660 vals.append(thing)
2661 thing = thing._mock_parent
2662 return _CallList(reversed(vals))
2663
2664
2665 call = _Call(from_kall=False)
2666
2667
2668 def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2669 _name=None, *, unsafe=False, **kwargs):
2670 """Create a mock object using another object as a spec. Attributes on the
2671 mock will use the corresponding attribute on the `spec` object as their
2672 spec.
2673
2674 Functions or methods being mocked will have their arguments checked
2675 to check that they are called with the correct signature.
2676
2677 If `spec_set` is True then attempting to set attributes that don't exist
2678 on the spec object will raise an `AttributeError`.
2679
2680 If a class is used as a spec then the return value of the mock (the
2681 instance of the class) will have the same spec. You can use a class as the
2682 spec for an instance object by passing `instance=True`. The returned mock
2683 will only be callable if instances of the mock are callable.
2684
2685 `create_autospec` will raise a `RuntimeError` if passed some common
2686 misspellings of the arguments autospec and spec_set. Pass the argument
2687 `unsafe` with the value True to disable that check.
2688
2689 `create_autospec` also takes arbitrary keyword arguments that are passed to
2690 the constructor of the created mock."""
2691 if _is_list(spec):
2692 # can't pass a list instance to the mock constructor as it will be
2693 # interpreted as a list of strings
2694 spec = type(spec)
2695
2696 is_type = isinstance(spec, type)
2697 if _is_instance_mock(spec):
2698 raise InvalidSpecError(f'Cannot autospec a Mock object. '
2699 f'[object={spec!r}]')
2700 is_async_func = _is_async_func(spec)
2701 _kwargs = {'spec': spec}
2702 if spec_set:
2703 _kwargs = {'spec_set': spec}
2704 elif spec is None:
2705 # None we mock with a normal mock without a spec
2706 _kwargs = {}
2707 if _kwargs and instance:
2708 _kwargs['_spec_as_instance'] = True
2709 if not unsafe:
2710 _check_spec_arg_typos(kwargs)
2711
2712 _kwargs.update(kwargs)
2713
2714 Klass = MagicMock
2715 if inspect.isdatadescriptor(spec):
2716 # descriptors don't have a spec
2717 # because we don't know what type they return
2718 _kwargs = {}
2719 elif is_async_func:
2720 if instance:
2721 raise RuntimeError("Instance can not be True when create_autospec "
2722 "is mocking an async function")
2723 Klass = AsyncMock
2724 elif not _callable(spec):
2725 Klass = NonCallableMagicMock
2726 elif is_type and instance and not _instance_callable(spec):
2727 Klass = NonCallableMagicMock
2728
2729 _name = _kwargs.pop('name', _name)
2730
2731 _new_name = _name
2732 if _parent is None:
2733 # for a top level object no _new_name should be set
2734 _new_name = ''
2735
2736 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2737 name=_name, **_kwargs)
2738
2739 if isinstance(spec, FunctionTypes):
2740 # should only happen at the top level because we don't
2741 # recurse for functions
2742 mock = _set_signature(mock, spec)
2743 if is_async_func:
2744 _setup_async_mock(mock)
2745 else:
2746 _check_signature(spec, mock, is_type, instance)
2747
2748 if _parent is not None and not instance:
2749 _parent._mock_children[_name] = mock
2750
2751 if is_type and not instance and 'return_value' not in kwargs:
2752 mock.return_value = create_autospec(spec, spec_set, instance=True,
2753 _name='()', _parent=mock)
2754
2755 for entry in dir(spec):
2756 if _is_magic(entry):
2757 # MagicMock already does the useful magic methods for us
2758 continue
2759
2760 # XXXX do we need a better way of getting attributes without
2761 # triggering code execution (?) Probably not - we need the actual
2762 # object to mock it so we would rather trigger a property than mock
2763 # the property descriptor. Likewise we want to mock out dynamically
2764 # provided attributes.
2765 # XXXX what about attributes that raise exceptions other than
2766 # AttributeError on being fetched?
2767 # we could be resilient against it, or catch and propagate the
2768 # exception when the attribute is fetched from the mock
2769 try:
2770 original = getattr(spec, entry)
2771 except AttributeError:
2772 continue
2773
2774 kwargs = {'spec': original}
2775 if spec_set:
2776 kwargs = {'spec_set': original}
2777
2778 if not isinstance(original, FunctionTypes):
2779 new = _SpecState(original, spec_set, mock, entry, instance)
2780 mock._mock_children[entry] = new
2781 else:
2782 parent = mock
2783 if isinstance(spec, FunctionTypes):
2784 parent = mock.mock
2785
2786 skipfirst = _must_skip(spec, entry, is_type)
2787 kwargs['_eat_self'] = skipfirst
2788 if iscoroutinefunction(original):
2789 child_klass = AsyncMock
2790 else:
2791 child_klass = MagicMock
2792 new = child_klass(parent=parent, name=entry, _new_name=entry,
2793 _new_parent=parent,
2794 **kwargs)
2795 mock._mock_children[entry] = new
2796 new.return_value = child_klass()
2797 _check_signature(original, new, skipfirst=skipfirst)
2798
2799 # so functions created with _set_signature become instance attributes,
2800 # *plus* their underlying mock exists in _mock_children of the parent
2801 # mock. Adding to _mock_children may be unnecessary where we are also
2802 # setting as an instance attribute?
2803 if isinstance(new, FunctionTypes):
2804 setattr(mock, entry, new)
2805
2806 return mock
2807
2808
2809 def _must_skip(spec, entry, is_type):
2810 """
2811 Return whether we should skip the first argument on spec's `entry`
2812 attribute.
2813 """
2814 if not isinstance(spec, type):
2815 if entry in getattr(spec, '__dict__', {}):
2816 # instance attribute - shouldn't skip
2817 return False
2818 spec = spec.__class__
2819
2820 for klass in spec.__mro__:
2821 result = klass.__dict__.get(entry, DEFAULT)
2822 if result is DEFAULT:
2823 continue
2824 if isinstance(result, (staticmethod, classmethod)):
2825 return False
2826 elif isinstance(result, FunctionTypes):
2827 # Normal method => skip if looked up on type
2828 # (if looked up on instance, self is already skipped)
2829 return is_type
2830 else:
2831 return False
2832
2833 # function is a dynamically provided attribute
2834 return is_type
2835
2836
2837 class ESC[4;38;5;81m_SpecState(ESC[4;38;5;149mobject):
2838
2839 def __init__(self, spec, spec_set=False, parent=None,
2840 name=None, ids=None, instance=False):
2841 self.spec = spec
2842 self.ids = ids
2843 self.spec_set = spec_set
2844 self.parent = parent
2845 self.instance = instance
2846 self.name = name
2847
2848
2849 FunctionTypes = (
2850 # python function
2851 type(create_autospec),
2852 # instance method
2853 type(ANY.__eq__),
2854 )
2855
2856
2857 file_spec = None
2858 open_spec = None
2859
2860
2861 def _to_stream(read_data):
2862 if isinstance(read_data, bytes):
2863 return io.BytesIO(read_data)
2864 else:
2865 return io.StringIO(read_data)
2866
2867
2868 def mock_open(mock=None, read_data=''):
2869 """
2870 A helper function to create a mock to replace the use of `open`. It works
2871 for `open` called directly or used as a context manager.
2872
2873 The `mock` argument is the mock object to configure. If `None` (the
2874 default) then a `MagicMock` will be created for you, with the API limited
2875 to methods or attributes available on standard file handles.
2876
2877 `read_data` is a string for the `read`, `readline` and `readlines` of the
2878 file handle to return. This is an empty string by default.
2879 """
2880 _read_data = _to_stream(read_data)
2881 _state = [_read_data, None]
2882
2883 def _readlines_side_effect(*args, **kwargs):
2884 if handle.readlines.return_value is not None:
2885 return handle.readlines.return_value
2886 return _state[0].readlines(*args, **kwargs)
2887
2888 def _read_side_effect(*args, **kwargs):
2889 if handle.read.return_value is not None:
2890 return handle.read.return_value
2891 return _state[0].read(*args, **kwargs)
2892
2893 def _readline_side_effect(*args, **kwargs):
2894 yield from _iter_side_effect()
2895 while True:
2896 yield _state[0].readline(*args, **kwargs)
2897
2898 def _iter_side_effect():
2899 if handle.readline.return_value is not None:
2900 while True:
2901 yield handle.readline.return_value
2902 for line in _state[0]:
2903 yield line
2904
2905 def _next_side_effect():
2906 if handle.readline.return_value is not None:
2907 return handle.readline.return_value
2908 return next(_state[0])
2909
2910 global file_spec
2911 if file_spec is None:
2912 import _io
2913 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2914
2915 global open_spec
2916 if open_spec is None:
2917 import _io
2918 open_spec = list(set(dir(_io.open)))
2919 if mock is None:
2920 mock = MagicMock(name='open', spec=open_spec)
2921
2922 handle = MagicMock(spec=file_spec)
2923 handle.__enter__.return_value = handle
2924
2925 handle.write.return_value = None
2926 handle.read.return_value = None
2927 handle.readline.return_value = None
2928 handle.readlines.return_value = None
2929
2930 handle.read.side_effect = _read_side_effect
2931 _state[1] = _readline_side_effect()
2932 handle.readline.side_effect = _state[1]
2933 handle.readlines.side_effect = _readlines_side_effect
2934 handle.__iter__.side_effect = _iter_side_effect
2935 handle.__next__.side_effect = _next_side_effect
2936
2937 def reset_data(*args, **kwargs):
2938 _state[0] = _to_stream(read_data)
2939 if handle.readline.side_effect == _state[1]:
2940 # Only reset the side effect if the user hasn't overridden it.
2941 _state[1] = _readline_side_effect()
2942 handle.readline.side_effect = _state[1]
2943 return DEFAULT
2944
2945 mock.side_effect = reset_data
2946 mock.return_value = handle
2947 return mock
2948
2949
2950 class ESC[4;38;5;81mPropertyMock(ESC[4;38;5;149mMock):
2951 """
2952 A mock intended to be used as a property, or other descriptor, on a class.
2953 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
2954 a return value when it is fetched.
2955
2956 Fetching a `PropertyMock` instance from an object calls the mock, with
2957 no args. Setting it calls the mock with the value being set.
2958 """
2959 def _get_child_mock(self, /, **kwargs):
2960 return MagicMock(**kwargs)
2961
2962 def __get__(self, obj, obj_type=None):
2963 return self()
2964 def __set__(self, obj, val):
2965 self(val)
2966
2967
2968 def seal(mock):
2969 """Disable the automatic generation of child mocks.
2970
2971 Given an input Mock, seals it to ensure no further mocks will be generated
2972 when accessing an attribute that was not already defined.
2973
2974 The operation recursively seals the mock passed in, meaning that
2975 the mock itself, any mocks generated by accessing one of its attributes,
2976 and all assigned mocks without a name or spec will be sealed.
2977 """
2978 mock._mock_sealed = True
2979 for attr in dir(mock):
2980 try:
2981 m = getattr(mock, attr)
2982 except AttributeError:
2983 continue
2984 if not isinstance(m, NonCallableMock):
2985 continue
2986 if isinstance(m._mock_children.get(attr), _SpecState):
2987 continue
2988 if m._mock_new_parent is mock:
2989 seal(m)
2990
2991
2992 class ESC[4;38;5;81m_AsyncIterator:
2993 """
2994 Wraps an iterator in an asynchronous iterator.
2995 """
2996 def __init__(self, iterator):
2997 self.iterator = iterator
2998 code_mock = NonCallableMock(spec_set=CodeType)
2999 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
3000 self.__dict__['__code__'] = code_mock
3001
3002 async def __anext__(self):
3003 try:
3004 return next(self.iterator)
3005 except StopIteration:
3006 pass
3007 raise StopAsyncIteration