1"""Event loop mixins.""" 2 3import threading 4from . import events 5 6_global_lock = threading.Lock() 7 8# Used as a sentinel for loop parameter 9_marker = object() 10 11 12class _LoopBoundMixin: 13 _loop = None 14 15 def __init__(self, *, loop=_marker): 16 if loop is not _marker: 17 raise TypeError( 18 f'As of 3.10, the *loop* parameter was removed from ' 19 f'{type(self).__name__}() since it is no longer necessary' 20 ) 21 22 def _get_loop(self): 23 loop = events._get_running_loop() 24 25 if self._loop is None: 26 with _global_lock: 27 if self._loop is None: 28 self._loop = loop 29 if loop is not self._loop: 30 raise RuntimeError(f'{self!r} is bound to a different event loop') 31 return loop 32