1 """functools.py - Tools for working with functions and callable objects
2 """
3 # Python module wrapper for _functools C module
4 # to allow utilities written in Python to be added
5 # to the functools module.
6 # Written by Nick Coghlan <ncoghlan at gmail.com>,
7 # Raymond Hettinger <python at rcn.com>,
8 # and Ćukasz Langa <lukasz at langa.pl>.
9 # Copyright (C) 2006-2013 Python Software Foundation.
10 # See C source code for _functools credits/copyright
11
12 __all__ = ['update_wrapper', 'wraps', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES',
13 'total_ordering', 'cache', 'cmp_to_key', 'lru_cache', 'reduce',
14 'partial', 'partialmethod', 'singledispatch', 'singledispatchmethod',
15 'cached_property']
16
17 from abc import get_cache_token
18 from collections import namedtuple
19 # import types, weakref # Deferred to single_dispatch()
20 from reprlib import recursive_repr
21 from _thread import RLock
22 from types import GenericAlias
23
24
25 ################################################################################
26 ### update_wrapper() and wraps() decorator
27 ################################################################################
28
29 # update_wrapper() and wraps() are tools to help write
30 # wrapper functions that can handle naive introspection
31
32 WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__qualname__', '__doc__',
33 '__annotations__', '__type_params__')
34 WRAPPER_UPDATES = ('__dict__',)
35 def update_wrapper(wrapper,
36 wrapped,
37 assigned = WRAPPER_ASSIGNMENTS,
38 updated = WRAPPER_UPDATES):
39 """Update a wrapper function to look like the wrapped function
40
41 wrapper is the function to be updated
42 wrapped is the original function
43 assigned is a tuple naming the attributes assigned directly
44 from the wrapped function to the wrapper function (defaults to
45 functools.WRAPPER_ASSIGNMENTS)
46 updated is a tuple naming the attributes of the wrapper that
47 are updated with the corresponding attribute from the wrapped
48 function (defaults to functools.WRAPPER_UPDATES)
49 """
50 for attr in assigned:
51 try:
52 value = getattr(wrapped, attr)
53 except AttributeError:
54 pass
55 else:
56 setattr(wrapper, attr, value)
57 for attr in updated:
58 getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
59 # Issue #17482: set __wrapped__ last so we don't inadvertently copy it
60 # from the wrapped function when updating __dict__
61 wrapper.__wrapped__ = wrapped
62 # Return the wrapper so this can be used as a decorator via partial()
63 return wrapper
64
65 def wraps(wrapped,
66 assigned = WRAPPER_ASSIGNMENTS,
67 updated = WRAPPER_UPDATES):
68 """Decorator factory to apply update_wrapper() to a wrapper function
69
70 Returns a decorator that invokes update_wrapper() with the decorated
71 function as the wrapper argument and the arguments to wraps() as the
72 remaining arguments. Default arguments are as for update_wrapper().
73 This is a convenience function to simplify applying partial() to
74 update_wrapper().
75 """
76 return partial(update_wrapper, wrapped=wrapped,
77 assigned=assigned, updated=updated)
78
79
80 ################################################################################
81 ### total_ordering class decorator
82 ################################################################################
83
84 # The total ordering functions all invoke the root magic method directly
85 # rather than using the corresponding operator. This avoids possible
86 # infinite recursion that could occur when the operator dispatch logic
87 # detects a NotImplemented result and then calls a reflected method.
88
89 def _gt_from_lt(self, other):
90 'Return a > b. Computed by @total_ordering from (not a < b) and (a != b).'
91 op_result = type(self).__lt__(self, other)
92 if op_result is NotImplemented:
93 return op_result
94 return not op_result and self != other
95
96 def _le_from_lt(self, other):
97 'Return a <= b. Computed by @total_ordering from (a < b) or (a == b).'
98 op_result = type(self).__lt__(self, other)
99 if op_result is NotImplemented:
100 return op_result
101 return op_result or self == other
102
103 def _ge_from_lt(self, other):
104 'Return a >= b. Computed by @total_ordering from (not a < b).'
105 op_result = type(self).__lt__(self, other)
106 if op_result is NotImplemented:
107 return op_result
108 return not op_result
109
110 def _ge_from_le(self, other):
111 'Return a >= b. Computed by @total_ordering from (not a <= b) or (a == b).'
112 op_result = type(self).__le__(self, other)
113 if op_result is NotImplemented:
114 return op_result
115 return not op_result or self == other
116
117 def _lt_from_le(self, other):
118 'Return a < b. Computed by @total_ordering from (a <= b) and (a != b).'
119 op_result = type(self).__le__(self, other)
120 if op_result is NotImplemented:
121 return op_result
122 return op_result and self != other
123
124 def _gt_from_le(self, other):
125 'Return a > b. Computed by @total_ordering from (not a <= b).'
126 op_result = type(self).__le__(self, other)
127 if op_result is NotImplemented:
128 return op_result
129 return not op_result
130
131 def _lt_from_gt(self, other):
132 'Return a < b. Computed by @total_ordering from (not a > b) and (a != b).'
133 op_result = type(self).__gt__(self, other)
134 if op_result is NotImplemented:
135 return op_result
136 return not op_result and self != other
137
138 def _ge_from_gt(self, other):
139 'Return a >= b. Computed by @total_ordering from (a > b) or (a == b).'
140 op_result = type(self).__gt__(self, other)
141 if op_result is NotImplemented:
142 return op_result
143 return op_result or self == other
144
145 def _le_from_gt(self, other):
146 'Return a <= b. Computed by @total_ordering from (not a > b).'
147 op_result = type(self).__gt__(self, other)
148 if op_result is NotImplemented:
149 return op_result
150 return not op_result
151
152 def _le_from_ge(self, other):
153 'Return a <= b. Computed by @total_ordering from (not a >= b) or (a == b).'
154 op_result = type(self).__ge__(self, other)
155 if op_result is NotImplemented:
156 return op_result
157 return not op_result or self == other
158
159 def _gt_from_ge(self, other):
160 'Return a > b. Computed by @total_ordering from (a >= b) and (a != b).'
161 op_result = type(self).__ge__(self, other)
162 if op_result is NotImplemented:
163 return op_result
164 return op_result and self != other
165
166 def _lt_from_ge(self, other):
167 'Return a < b. Computed by @total_ordering from (not a >= b).'
168 op_result = type(self).__ge__(self, other)
169 if op_result is NotImplemented:
170 return op_result
171 return not op_result
172
173 _convert = {
174 '__lt__': [('__gt__', _gt_from_lt),
175 ('__le__', _le_from_lt),
176 ('__ge__', _ge_from_lt)],
177 '__le__': [('__ge__', _ge_from_le),
178 ('__lt__', _lt_from_le),
179 ('__gt__', _gt_from_le)],
180 '__gt__': [('__lt__', _lt_from_gt),
181 ('__ge__', _ge_from_gt),
182 ('__le__', _le_from_gt)],
183 '__ge__': [('__le__', _le_from_ge),
184 ('__gt__', _gt_from_ge),
185 ('__lt__', _lt_from_ge)]
186 }
187
188 def total_ordering(cls):
189 """Class decorator that fills in missing ordering methods"""
190 # Find user-defined comparisons (not those inherited from object).
191 roots = {op for op in _convert if getattr(cls, op, None) is not getattr(object, op, None)}
192 if not roots:
193 raise ValueError('must define at least one ordering operation: < > <= >=')
194 root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__
195 for opname, opfunc in _convert[root]:
196 if opname not in roots:
197 opfunc.__name__ = opname
198 setattr(cls, opname, opfunc)
199 return cls
200
201
202 ################################################################################
203 ### cmp_to_key() function converter
204 ################################################################################
205
206 def cmp_to_key(mycmp):
207 """Convert a cmp= function into a key= function"""
208 class ESC[4;38;5;81mK(ESC[4;38;5;149mobject):
209 __slots__ = ['obj']
210 def __init__(self, obj):
211 self.obj = obj
212 def __lt__(self, other):
213 return mycmp(self.obj, other.obj) < 0
214 def __gt__(self, other):
215 return mycmp(self.obj, other.obj) > 0
216 def __eq__(self, other):
217 return mycmp(self.obj, other.obj) == 0
218 def __le__(self, other):
219 return mycmp(self.obj, other.obj) <= 0
220 def __ge__(self, other):
221 return mycmp(self.obj, other.obj) >= 0
222 __hash__ = None
223 return K
224
225 try:
226 from _functools import cmp_to_key
227 except ImportError:
228 pass
229
230
231 ################################################################################
232 ### reduce() sequence to a single item
233 ################################################################################
234
235 _initial_missing = object()
236
237 def reduce(function, sequence, initial=_initial_missing):
238 """
239 reduce(function, iterable[, initial]) -> value
240
241 Apply a function of two arguments cumulatively to the items of a sequence
242 or iterable, from left to right, so as to reduce the iterable to a single
243 value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
244 ((((1+2)+3)+4)+5). If initial is present, it is placed before the items
245 of the iterable in the calculation, and serves as a default when the
246 iterable is empty.
247 """
248
249 it = iter(sequence)
250
251 if initial is _initial_missing:
252 try:
253 value = next(it)
254 except StopIteration:
255 raise TypeError(
256 "reduce() of empty iterable with no initial value") from None
257 else:
258 value = initial
259
260 for element in it:
261 value = function(value, element)
262
263 return value
264
265 try:
266 from _functools import reduce
267 except ImportError:
268 pass
269
270
271 ################################################################################
272 ### partial() argument application
273 ################################################################################
274
275 # Purely functional, no descriptor behaviour
276 class ESC[4;38;5;81mpartial:
277 """New function with partial application of the given arguments
278 and keywords.
279 """
280
281 __slots__ = "func", "args", "keywords", "__dict__", "__weakref__"
282
283 def __new__(cls, func, /, *args, **keywords):
284 if not callable(func):
285 raise TypeError("the first argument must be callable")
286
287 if hasattr(func, "func"):
288 args = func.args + args
289 keywords = {**func.keywords, **keywords}
290 func = func.func
291
292 self = super(partial, cls).__new__(cls)
293
294 self.func = func
295 self.args = args
296 self.keywords = keywords
297 return self
298
299 def __call__(self, /, *args, **keywords):
300 keywords = {**self.keywords, **keywords}
301 return self.func(*self.args, *args, **keywords)
302
303 @recursive_repr()
304 def __repr__(self):
305 qualname = type(self).__qualname__
306 args = [repr(self.func)]
307 args.extend(repr(x) for x in self.args)
308 args.extend(f"{k}={v!r}" for (k, v) in self.keywords.items())
309 if type(self).__module__ == "functools":
310 return f"functools.{qualname}({', '.join(args)})"
311 return f"{qualname}({', '.join(args)})"
312
313 def __reduce__(self):
314 return type(self), (self.func,), (self.func, self.args,
315 self.keywords or None, self.__dict__ or None)
316
317 def __setstate__(self, state):
318 if not isinstance(state, tuple):
319 raise TypeError("argument to __setstate__ must be a tuple")
320 if len(state) != 4:
321 raise TypeError(f"expected 4 items in state, got {len(state)}")
322 func, args, kwds, namespace = state
323 if (not callable(func) or not isinstance(args, tuple) or
324 (kwds is not None and not isinstance(kwds, dict)) or
325 (namespace is not None and not isinstance(namespace, dict))):
326 raise TypeError("invalid partial state")
327
328 args = tuple(args) # just in case it's a subclass
329 if kwds is None:
330 kwds = {}
331 elif type(kwds) is not dict: # XXX does it need to be *exactly* dict?
332 kwds = dict(kwds)
333 if namespace is None:
334 namespace = {}
335
336 self.__dict__ = namespace
337 self.func = func
338 self.args = args
339 self.keywords = kwds
340
341 try:
342 from _functools import partial
343 except ImportError:
344 pass
345
346 # Descriptor version
347 class ESC[4;38;5;81mpartialmethod(ESC[4;38;5;149mobject):
348 """Method descriptor with partial application of the given arguments
349 and keywords.
350
351 Supports wrapping existing descriptors and handles non-descriptor
352 callables as instance methods.
353 """
354
355 def __init__(self, func, /, *args, **keywords):
356 if not callable(func) and not hasattr(func, "__get__"):
357 raise TypeError("{!r} is not callable or a descriptor"
358 .format(func))
359
360 # func could be a descriptor like classmethod which isn't callable,
361 # so we can't inherit from partial (it verifies func is callable)
362 if isinstance(func, partialmethod):
363 # flattening is mandatory in order to place cls/self before all
364 # other arguments
365 # it's also more efficient since only one function will be called
366 self.func = func.func
367 self.args = func.args + args
368 self.keywords = {**func.keywords, **keywords}
369 else:
370 self.func = func
371 self.args = args
372 self.keywords = keywords
373
374 def __repr__(self):
375 args = ", ".join(map(repr, self.args))
376 keywords = ", ".join("{}={!r}".format(k, v)
377 for k, v in self.keywords.items())
378 format_string = "{module}.{cls}({func}, {args}, {keywords})"
379 return format_string.format(module=self.__class__.__module__,
380 cls=self.__class__.__qualname__,
381 func=self.func,
382 args=args,
383 keywords=keywords)
384
385 def _make_unbound_method(self):
386 def _method(cls_or_self, /, *args, **keywords):
387 keywords = {**self.keywords, **keywords}
388 return self.func(cls_or_self, *self.args, *args, **keywords)
389 _method.__isabstractmethod__ = self.__isabstractmethod__
390 _method._partialmethod = self
391 return _method
392
393 def __get__(self, obj, cls=None):
394 get = getattr(self.func, "__get__", None)
395 result = None
396 if get is not None:
397 new_func = get(obj, cls)
398 if new_func is not self.func:
399 # Assume __get__ returning something new indicates the
400 # creation of an appropriate callable
401 result = partial(new_func, *self.args, **self.keywords)
402 try:
403 result.__self__ = new_func.__self__
404 except AttributeError:
405 pass
406 if result is None:
407 # If the underlying descriptor didn't do anything, treat this
408 # like an instance method
409 result = self._make_unbound_method().__get__(obj, cls)
410 return result
411
412 @property
413 def __isabstractmethod__(self):
414 return getattr(self.func, "__isabstractmethod__", False)
415
416 __class_getitem__ = classmethod(GenericAlias)
417
418
419 # Helper functions
420
421 def _unwrap_partial(func):
422 while isinstance(func, partial):
423 func = func.func
424 return func
425
426 ################################################################################
427 ### LRU Cache function decorator
428 ################################################################################
429
430 _CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
431
432 class ESC[4;38;5;81m_HashedSeq(ESC[4;38;5;149mlist):
433 """ This class guarantees that hash() will be called no more than once
434 per element. This is important because the lru_cache() will hash
435 the key multiple times on a cache miss.
436
437 """
438
439 __slots__ = 'hashvalue'
440
441 def __init__(self, tup, hash=hash):
442 self[:] = tup
443 self.hashvalue = hash(tup)
444
445 def __hash__(self):
446 return self.hashvalue
447
448 def _make_key(args, kwds, typed,
449 kwd_mark = (object(),),
450 fasttypes = {int, str},
451 tuple=tuple, type=type, len=len):
452 """Make a cache key from optionally typed positional and keyword arguments
453
454 The key is constructed in a way that is flat as possible rather than
455 as a nested structure that would take more memory.
456
457 If there is only a single argument and its data type is known to cache
458 its hash value, then that argument is returned without a wrapper. This
459 saves space and improves lookup speed.
460
461 """
462 # All of code below relies on kwds preserving the order input by the user.
463 # Formerly, we sorted() the kwds before looping. The new way is *much*
464 # faster; however, it means that f(x=1, y=2) will now be treated as a
465 # distinct call from f(y=2, x=1) which will be cached separately.
466 key = args
467 if kwds:
468 key += kwd_mark
469 for item in kwds.items():
470 key += item
471 if typed:
472 key += tuple(type(v) for v in args)
473 if kwds:
474 key += tuple(type(v) for v in kwds.values())
475 elif len(key) == 1 and type(key[0]) in fasttypes:
476 return key[0]
477 return _HashedSeq(key)
478
479 def lru_cache(maxsize=128, typed=False):
480 """Least-recently-used cache decorator.
481
482 If *maxsize* is set to None, the LRU features are disabled and the cache
483 can grow without bound.
484
485 If *typed* is True, arguments of different types will be cached separately.
486 For example, f(3.0) and f(3) will be treated as distinct calls with
487 distinct results.
488
489 Arguments to the cached function must be hashable.
490
491 View the cache statistics named tuple (hits, misses, maxsize, currsize)
492 with f.cache_info(). Clear the cache and statistics with f.cache_clear().
493 Access the underlying function with f.__wrapped__.
494
495 See: https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)
496
497 """
498
499 # Users should only access the lru_cache through its public API:
500 # cache_info, cache_clear, and f.__wrapped__
501 # The internals of the lru_cache are encapsulated for thread safety and
502 # to allow the implementation to change (including a possible C version).
503
504 if isinstance(maxsize, int):
505 # Negative maxsize is treated as 0
506 if maxsize < 0:
507 maxsize = 0
508 elif callable(maxsize) and isinstance(typed, bool):
509 # The user_function was passed in directly via the maxsize argument
510 user_function, maxsize = maxsize, 128
511 wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
512 wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
513 return update_wrapper(wrapper, user_function)
514 elif maxsize is not None:
515 raise TypeError(
516 'Expected first argument to be an integer, a callable, or None')
517
518 def decorating_function(user_function):
519 wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
520 wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
521 return update_wrapper(wrapper, user_function)
522
523 return decorating_function
524
525 def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
526 # Constants shared by all lru cache instances:
527 sentinel = object() # unique object used to signal cache misses
528 make_key = _make_key # build a key from the function arguments
529 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields
530
531 cache = {}
532 hits = misses = 0
533 full = False
534 cache_get = cache.get # bound method to lookup a key or return None
535 cache_len = cache.__len__ # get cache size without calling len()
536 lock = RLock() # because linkedlist updates aren't threadsafe
537 root = [] # root of the circular doubly linked list
538 root[:] = [root, root, None, None] # initialize by pointing to self
539
540 if maxsize == 0:
541
542 def wrapper(*args, **kwds):
543 # No caching -- just a statistics update
544 nonlocal misses
545 misses += 1
546 result = user_function(*args, **kwds)
547 return result
548
549 elif maxsize is None:
550
551 def wrapper(*args, **kwds):
552 # Simple caching without ordering or size limit
553 nonlocal hits, misses
554 key = make_key(args, kwds, typed)
555 result = cache_get(key, sentinel)
556 if result is not sentinel:
557 hits += 1
558 return result
559 misses += 1
560 result = user_function(*args, **kwds)
561 cache[key] = result
562 return result
563
564 else:
565
566 def wrapper(*args, **kwds):
567 # Size limited caching that tracks accesses by recency
568 nonlocal root, hits, misses, full
569 key = make_key(args, kwds, typed)
570 with lock:
571 link = cache_get(key)
572 if link is not None:
573 # Move the link to the front of the circular queue
574 link_prev, link_next, _key, result = link
575 link_prev[NEXT] = link_next
576 link_next[PREV] = link_prev
577 last = root[PREV]
578 last[NEXT] = root[PREV] = link
579 link[PREV] = last
580 link[NEXT] = root
581 hits += 1
582 return result
583 misses += 1
584 result = user_function(*args, **kwds)
585 with lock:
586 if key in cache:
587 # Getting here means that this same key was added to the
588 # cache while the lock was released. Since the link
589 # update is already done, we need only return the
590 # computed result and update the count of misses.
591 pass
592 elif full:
593 # Use the old root to store the new key and result.
594 oldroot = root
595 oldroot[KEY] = key
596 oldroot[RESULT] = result
597 # Empty the oldest link and make it the new root.
598 # Keep a reference to the old key and old result to
599 # prevent their ref counts from going to zero during the
600 # update. That will prevent potentially arbitrary object
601 # clean-up code (i.e. __del__) from running while we're
602 # still adjusting the links.
603 root = oldroot[NEXT]
604 oldkey = root[KEY]
605 oldresult = root[RESULT]
606 root[KEY] = root[RESULT] = None
607 # Now update the cache dictionary.
608 del cache[oldkey]
609 # Save the potentially reentrant cache[key] assignment
610 # for last, after the root and links have been put in
611 # a consistent state.
612 cache[key] = oldroot
613 else:
614 # Put result in a new link at the front of the queue.
615 last = root[PREV]
616 link = [last, root, key, result]
617 last[NEXT] = root[PREV] = cache[key] = link
618 # Use the cache_len bound method instead of the len() function
619 # which could potentially be wrapped in an lru_cache itself.
620 full = (cache_len() >= maxsize)
621 return result
622
623 def cache_info():
624 """Report cache statistics"""
625 with lock:
626 return _CacheInfo(hits, misses, maxsize, cache_len())
627
628 def cache_clear():
629 """Clear the cache and cache statistics"""
630 nonlocal hits, misses, full
631 with lock:
632 cache.clear()
633 root[:] = [root, root, None, None]
634 hits = misses = 0
635 full = False
636
637 wrapper.cache_info = cache_info
638 wrapper.cache_clear = cache_clear
639 return wrapper
640
641 try:
642 from _functools import _lru_cache_wrapper
643 except ImportError:
644 pass
645
646
647 ################################################################################
648 ### cache -- simplified access to the infinity cache
649 ################################################################################
650
651 def cache(user_function, /):
652 'Simple lightweight unbounded cache. Sometimes called "memoize".'
653 return lru_cache(maxsize=None)(user_function)
654
655
656 ################################################################################
657 ### singledispatch() - single-dispatch generic function decorator
658 ################################################################################
659
660 def _c3_merge(sequences):
661 """Merges MROs in *sequences* to a single MRO using the C3 algorithm.
662
663 Adapted from https://www.python.org/download/releases/2.3/mro/.
664
665 """
666 result = []
667 while True:
668 sequences = [s for s in sequences if s] # purge empty sequences
669 if not sequences:
670 return result
671 for s1 in sequences: # find merge candidates among seq heads
672 candidate = s1[0]
673 for s2 in sequences:
674 if candidate in s2[1:]:
675 candidate = None
676 break # reject the current head, it appears later
677 else:
678 break
679 if candidate is None:
680 raise RuntimeError("Inconsistent hierarchy")
681 result.append(candidate)
682 # remove the chosen candidate
683 for seq in sequences:
684 if seq[0] == candidate:
685 del seq[0]
686
687 def _c3_mro(cls, abcs=None):
688 """Computes the method resolution order using extended C3 linearization.
689
690 If no *abcs* are given, the algorithm works exactly like the built-in C3
691 linearization used for method resolution.
692
693 If given, *abcs* is a list of abstract base classes that should be inserted
694 into the resulting MRO. Unrelated ABCs are ignored and don't end up in the
695 result. The algorithm inserts ABCs where their functionality is introduced,
696 i.e. issubclass(cls, abc) returns True for the class itself but returns
697 False for all its direct base classes. Implicit ABCs for a given class
698 (either registered or inferred from the presence of a special method like
699 __len__) are inserted directly after the last ABC explicitly listed in the
700 MRO of said class. If two implicit ABCs end up next to each other in the
701 resulting MRO, their ordering depends on the order of types in *abcs*.
702
703 """
704 for i, base in enumerate(reversed(cls.__bases__)):
705 if hasattr(base, '__abstractmethods__'):
706 boundary = len(cls.__bases__) - i
707 break # Bases up to the last explicit ABC are considered first.
708 else:
709 boundary = 0
710 abcs = list(abcs) if abcs else []
711 explicit_bases = list(cls.__bases__[:boundary])
712 abstract_bases = []
713 other_bases = list(cls.__bases__[boundary:])
714 for base in abcs:
715 if issubclass(cls, base) and not any(
716 issubclass(b, base) for b in cls.__bases__
717 ):
718 # If *cls* is the class that introduces behaviour described by
719 # an ABC *base*, insert said ABC to its MRO.
720 abstract_bases.append(base)
721 for base in abstract_bases:
722 abcs.remove(base)
723 explicit_c3_mros = [_c3_mro(base, abcs=abcs) for base in explicit_bases]
724 abstract_c3_mros = [_c3_mro(base, abcs=abcs) for base in abstract_bases]
725 other_c3_mros = [_c3_mro(base, abcs=abcs) for base in other_bases]
726 return _c3_merge(
727 [[cls]] +
728 explicit_c3_mros + abstract_c3_mros + other_c3_mros +
729 [explicit_bases] + [abstract_bases] + [other_bases]
730 )
731
732 def _compose_mro(cls, types):
733 """Calculates the method resolution order for a given class *cls*.
734
735 Includes relevant abstract base classes (with their respective bases) from
736 the *types* iterable. Uses a modified C3 linearization algorithm.
737
738 """
739 bases = set(cls.__mro__)
740 # Remove entries which are already present in the __mro__ or unrelated.
741 def is_related(typ):
742 return (typ not in bases and hasattr(typ, '__mro__')
743 and not isinstance(typ, GenericAlias)
744 and issubclass(cls, typ))
745 types = [n for n in types if is_related(n)]
746 # Remove entries which are strict bases of other entries (they will end up
747 # in the MRO anyway.
748 def is_strict_base(typ):
749 for other in types:
750 if typ != other and typ in other.__mro__:
751 return True
752 return False
753 types = [n for n in types if not is_strict_base(n)]
754 # Subclasses of the ABCs in *types* which are also implemented by
755 # *cls* can be used to stabilize ABC ordering.
756 type_set = set(types)
757 mro = []
758 for typ in types:
759 found = []
760 for sub in typ.__subclasses__():
761 if sub not in bases and issubclass(cls, sub):
762 found.append([s for s in sub.__mro__ if s in type_set])
763 if not found:
764 mro.append(typ)
765 continue
766 # Favor subclasses with the biggest number of useful bases
767 found.sort(key=len, reverse=True)
768 for sub in found:
769 for subcls in sub:
770 if subcls not in mro:
771 mro.append(subcls)
772 return _c3_mro(cls, abcs=mro)
773
774 def _find_impl(cls, registry):
775 """Returns the best matching implementation from *registry* for type *cls*.
776
777 Where there is no registered implementation for a specific type, its method
778 resolution order is used to find a more generic implementation.
779
780 Note: if *registry* does not contain an implementation for the base
781 *object* type, this function may return None.
782
783 """
784 mro = _compose_mro(cls, registry.keys())
785 match = None
786 for t in mro:
787 if match is not None:
788 # If *match* is an implicit ABC but there is another unrelated,
789 # equally matching implicit ABC, refuse the temptation to guess.
790 if (t in registry and t not in cls.__mro__
791 and match not in cls.__mro__
792 and not issubclass(match, t)):
793 raise RuntimeError("Ambiguous dispatch: {} or {}".format(
794 match, t))
795 break
796 if t in registry:
797 match = t
798 return registry.get(match)
799
800 def singledispatch(func):
801 """Single-dispatch generic function decorator.
802
803 Transforms a function into a generic function, which can have different
804 behaviours depending upon the type of its first argument. The decorated
805 function acts as the default implementation, and additional
806 implementations can be registered using the register() attribute of the
807 generic function.
808 """
809 # There are many programs that use functools without singledispatch, so we
810 # trade-off making singledispatch marginally slower for the benefit of
811 # making start-up of such applications slightly faster.
812 import types, weakref
813
814 registry = {}
815 dispatch_cache = weakref.WeakKeyDictionary()
816 cache_token = None
817
818 def dispatch(cls):
819 """generic_func.dispatch(cls) -> <function implementation>
820
821 Runs the dispatch algorithm to return the best available implementation
822 for the given *cls* registered on *generic_func*.
823
824 """
825 nonlocal cache_token
826 if cache_token is not None:
827 current_token = get_cache_token()
828 if cache_token != current_token:
829 dispatch_cache.clear()
830 cache_token = current_token
831 try:
832 impl = dispatch_cache[cls]
833 except KeyError:
834 try:
835 impl = registry[cls]
836 except KeyError:
837 impl = _find_impl(cls, registry)
838 dispatch_cache[cls] = impl
839 return impl
840
841 def _is_union_type(cls):
842 from typing import get_origin, Union
843 return get_origin(cls) in {Union, types.UnionType}
844
845 def _is_valid_dispatch_type(cls):
846 if isinstance(cls, type):
847 return True
848 from typing import get_args
849 return (_is_union_type(cls) and
850 all(isinstance(arg, type) for arg in get_args(cls)))
851
852 def register(cls, func=None):
853 """generic_func.register(cls, func) -> func
854
855 Registers a new implementation for the given *cls* on a *generic_func*.
856
857 """
858 nonlocal cache_token
859 if _is_valid_dispatch_type(cls):
860 if func is None:
861 return lambda f: register(cls, f)
862 else:
863 if func is not None:
864 raise TypeError(
865 f"Invalid first argument to `register()`. "
866 f"{cls!r} is not a class or union type."
867 )
868 ann = getattr(cls, '__annotations__', {})
869 if not ann:
870 raise TypeError(
871 f"Invalid first argument to `register()`: {cls!r}. "
872 f"Use either `@register(some_class)` or plain `@register` "
873 f"on an annotated function."
874 )
875 func = cls
876
877 # only import typing if annotation parsing is necessary
878 from typing import get_type_hints
879 argname, cls = next(iter(get_type_hints(func).items()))
880 if not _is_valid_dispatch_type(cls):
881 if _is_union_type(cls):
882 raise TypeError(
883 f"Invalid annotation for {argname!r}. "
884 f"{cls!r} not all arguments are classes."
885 )
886 else:
887 raise TypeError(
888 f"Invalid annotation for {argname!r}. "
889 f"{cls!r} is not a class."
890 )
891
892 if _is_union_type(cls):
893 from typing import get_args
894
895 for arg in get_args(cls):
896 registry[arg] = func
897 else:
898 registry[cls] = func
899 if cache_token is None and hasattr(cls, '__abstractmethods__'):
900 cache_token = get_cache_token()
901 dispatch_cache.clear()
902 return func
903
904 def wrapper(*args, **kw):
905 if not args:
906 raise TypeError(f'{funcname} requires at least '
907 '1 positional argument')
908
909 return dispatch(args[0].__class__)(*args, **kw)
910
911 funcname = getattr(func, '__name__', 'singledispatch function')
912 registry[object] = func
913 wrapper.register = register
914 wrapper.dispatch = dispatch
915 wrapper.registry = types.MappingProxyType(registry)
916 wrapper._clear_cache = dispatch_cache.clear
917 update_wrapper(wrapper, func)
918 return wrapper
919
920
921 # Descriptor version
922 class ESC[4;38;5;81msingledispatchmethod:
923 """Single-dispatch generic method descriptor.
924
925 Supports wrapping existing descriptors and handles non-descriptor
926 callables as instance methods.
927 """
928
929 def __init__(self, func):
930 if not callable(func) and not hasattr(func, "__get__"):
931 raise TypeError(f"{func!r} is not callable or a descriptor")
932
933 self.dispatcher = singledispatch(func)
934 self.func = func
935
936 def register(self, cls, method=None):
937 """generic_method.register(cls, func) -> func
938
939 Registers a new implementation for the given *cls* on a *generic_method*.
940 """
941 return self.dispatcher.register(cls, func=method)
942
943 def __get__(self, obj, cls=None):
944 def _method(*args, **kwargs):
945 method = self.dispatcher.dispatch(args[0].__class__)
946 return method.__get__(obj, cls)(*args, **kwargs)
947
948 _method.__isabstractmethod__ = self.__isabstractmethod__
949 _method.register = self.register
950 update_wrapper(_method, self.func)
951 return _method
952
953 @property
954 def __isabstractmethod__(self):
955 return getattr(self.func, '__isabstractmethod__', False)
956
957
958 ################################################################################
959 ### cached_property() - property result cached as instance attribute
960 ################################################################################
961
962 _NOT_FOUND = object()
963
964 class ESC[4;38;5;81mcached_property:
965 def __init__(self, func):
966 self.func = func
967 self.attrname = None
968 self.__doc__ = func.__doc__
969
970 def __set_name__(self, owner, name):
971 if self.attrname is None:
972 self.attrname = name
973 elif name != self.attrname:
974 raise TypeError(
975 "Cannot assign the same cached_property to two different names "
976 f"({self.attrname!r} and {name!r})."
977 )
978
979 def __get__(self, instance, owner=None):
980 if instance is None:
981 return self
982 if self.attrname is None:
983 raise TypeError(
984 "Cannot use cached_property instance without calling __set_name__ on it.")
985 try:
986 cache = instance.__dict__
987 except AttributeError: # not all objects have __dict__ (e.g. class defines slots)
988 msg = (
989 f"No '__dict__' attribute on {type(instance).__name__!r} "
990 f"instance to cache {self.attrname!r} property."
991 )
992 raise TypeError(msg) from None
993 val = cache.get(self.attrname, _NOT_FOUND)
994 if val is _NOT_FOUND:
995 val = self.func(instance)
996 try:
997 cache[self.attrname] = val
998 except TypeError:
999 msg = (
1000 f"The '__dict__' attribute on {type(instance).__name__!r} instance "
1001 f"does not support item assignment for caching {self.attrname!r} property."
1002 )
1003 raise TypeError(msg) from None
1004 return val
1005
1006 __class_getitem__ = classmethod(GenericAlias)