1from _weakrefset import WeakSet 2 3 4def get_cache_token(): 5 """Returns the current ABC cache token. 6 7 The token is an opaque object (supporting equality testing) identifying the 8 current version of the ABC cache for virtual subclasses. The token changes 9 with every call to ``register()`` on any ABC. 10 """ 11 return ABCMeta._abc_invalidation_counter 12 13 14class ABCMeta(type): 15 """Metaclass for defining Abstract Base Classes (ABCs). 16 17 Use this metaclass to create an ABC. An ABC can be subclassed 18 directly, and then acts as a mix-in class. You can also register 19 unrelated concrete classes (even built-in classes) and unrelated 20 ABCs as 'virtual subclasses' -- these and their descendants will 21 be considered subclasses of the registering ABC by the built-in 22 issubclass() function, but the registering ABC won't show up in 23 their MRO (Method Resolution Order) nor will method 24 implementations defined by the registering ABC be callable (not 25 even via super()). 26 """ 27 28 # A global counter that is incremented each time a class is 29 # registered as a virtual subclass of anything. It forces the 30 # negative cache to be cleared before its next use. 31 # Note: this counter is private. Use `abc.get_cache_token()` for 32 # external code. 33 _abc_invalidation_counter = 0 34 35 def __new__(mcls, name, bases, namespace, **kwargs): 36 cls = super().__new__(mcls, name, bases, namespace, **kwargs) 37 # Compute set of abstract method names 38 abstracts = {name 39 for name, value in namespace.items() 40 if getattr(value, "__isabstractmethod__", False)} 41 for base in bases: 42 for name in getattr(base, "__abstractmethods__", set()): 43 value = getattr(cls, name, None) 44 if getattr(value, "__isabstractmethod__", False): 45 abstracts.add(name) 46 cls.__abstractmethods__ = frozenset(abstracts) 47 # Set up inheritance registry 48 cls._abc_registry = WeakSet() 49 cls._abc_cache = WeakSet() 50 cls._abc_negative_cache = WeakSet() 51 cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter 52 return cls 53 54 def register(cls, subclass): 55 """Register a virtual subclass of an ABC. 56 57 Returns the subclass, to allow usage as a class decorator. 58 """ 59 if not isinstance(subclass, type): 60 raise TypeError("Can only register classes") 61 if issubclass(subclass, cls): 62 return subclass # Already a subclass 63 # Subtle: test for cycles *after* testing for "already a subclass"; 64 # this means we allow X.register(X) and interpret it as a no-op. 65 if issubclass(cls, subclass): 66 # This would create a cycle, which is bad for the algorithm below 67 raise RuntimeError("Refusing to create an inheritance cycle") 68 cls._abc_registry.add(subclass) 69 ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache 70 return subclass 71 72 def _dump_registry(cls, file=None): 73 """Debug helper to print the ABC registry.""" 74 print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file) 75 print(f"Inv. counter: {get_cache_token()}", file=file) 76 for name in cls.__dict__: 77 if name.startswith("_abc_"): 78 value = getattr(cls, name) 79 if isinstance(value, WeakSet): 80 value = set(value) 81 print(f"{name}: {value!r}", file=file) 82 83 def _abc_registry_clear(cls): 84 """Clear the registry (for debugging or testing).""" 85 cls._abc_registry.clear() 86 87 def _abc_caches_clear(cls): 88 """Clear the caches (for debugging or testing).""" 89 cls._abc_cache.clear() 90 cls._abc_negative_cache.clear() 91 92 def __instancecheck__(cls, instance): 93 """Override for isinstance(instance, cls).""" 94 # Inline the cache checking 95 subclass = instance.__class__ 96 if subclass in cls._abc_cache: 97 return True 98 subtype = type(instance) 99 if subtype is subclass: 100 if (cls._abc_negative_cache_version == 101 ABCMeta._abc_invalidation_counter and 102 subclass in cls._abc_negative_cache): 103 return False 104 # Fall back to the subclass check. 105 return cls.__subclasscheck__(subclass) 106 return any(cls.__subclasscheck__(c) for c in (subclass, subtype)) 107 108 def __subclasscheck__(cls, subclass): 109 """Override for issubclass(subclass, cls).""" 110 if not isinstance(subclass, type): 111 raise TypeError('issubclass() arg 1 must be a class') 112 # Check cache 113 if subclass in cls._abc_cache: 114 return True 115 # Check negative cache; may have to invalidate 116 if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter: 117 # Invalidate the negative cache 118 cls._abc_negative_cache = WeakSet() 119 cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter 120 elif subclass in cls._abc_negative_cache: 121 return False 122 # Check the subclass hook 123 ok = cls.__subclasshook__(subclass) 124 if ok is not NotImplemented: 125 assert isinstance(ok, bool) 126 if ok: 127 cls._abc_cache.add(subclass) 128 else: 129 cls._abc_negative_cache.add(subclass) 130 return ok 131 # Check if it's a direct subclass 132 if cls in getattr(subclass, '__mro__', ()): 133 cls._abc_cache.add(subclass) 134 return True 135 # Check if it's a subclass of a registered class (recursive) 136 for rcls in cls._abc_registry: 137 if issubclass(subclass, rcls): 138 cls._abc_cache.add(subclass) 139 return True 140 # Check if it's a subclass of a subclass (recursive) 141 for scls in cls.__subclasses__(): 142 if issubclass(subclass, scls): 143 cls._abc_cache.add(subclass) 144 return True 145 # No dice; update negative cache 146 cls._abc_negative_cache.add(subclass) 147 return False 148