1"""Event loop and event loop policy.""" 2 3__all__ = ['AbstractEventLoopPolicy', 4 'AbstractEventLoop', 'AbstractServer', 5 'Handle', 'TimerHandle', 6 'get_event_loop_policy', 'set_event_loop_policy', 7 'get_event_loop', 'set_event_loop', 'new_event_loop', 8 'get_child_watcher', 'set_child_watcher', 9 '_set_running_loop', '_get_running_loop', 10 ] 11 12import functools 13import inspect 14import os 15import reprlib 16import socket 17import subprocess 18import sys 19import threading 20import traceback 21 22from asyncio import compat 23 24 25def _get_function_source(func): 26 if compat.PY34: 27 func = inspect.unwrap(func) 28 elif hasattr(func, '__wrapped__'): 29 func = func.__wrapped__ 30 if inspect.isfunction(func): 31 code = func.__code__ 32 return (code.co_filename, code.co_firstlineno) 33 if isinstance(func, functools.partial): 34 return _get_function_source(func.func) 35 if compat.PY34 and isinstance(func, functools.partialmethod): 36 return _get_function_source(func.func) 37 return None 38 39 40def _format_args_and_kwargs(args, kwargs): 41 """Format function arguments and keyword arguments. 42 43 Special case for a single parameter: ('hello',) is formatted as ('hello'). 44 """ 45 # use reprlib to limit the length of the output 46 items = [] 47 if args: 48 items.extend(reprlib.repr(arg) for arg in args) 49 if kwargs: 50 items.extend('{}={}'.format(k, reprlib.repr(v)) 51 for k, v in kwargs.items()) 52 return '(' + ', '.join(items) + ')' 53 54 55def _format_callback(func, args, kwargs, suffix=''): 56 if isinstance(func, functools.partial): 57 suffix = _format_args_and_kwargs(args, kwargs) + suffix 58 return _format_callback(func.func, func.args, func.keywords, suffix) 59 60 if hasattr(func, '__qualname__'): 61 func_repr = getattr(func, '__qualname__') 62 elif hasattr(func, '__name__'): 63 func_repr = getattr(func, '__name__') 64 else: 65 func_repr = repr(func) 66 67 func_repr += _format_args_and_kwargs(args, kwargs) 68 if suffix: 69 func_repr += suffix 70 return func_repr 71 72def _format_callback_source(func, args): 73 func_repr = _format_callback(func, args, None) 74 source = _get_function_source(func) 75 if source: 76 func_repr += ' at %s:%s' % source 77 return func_repr 78 79 80class Handle: 81 """Object returned by callback registration methods.""" 82 83 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 84 '_source_traceback', '_repr', '__weakref__') 85 86 def __init__(self, callback, args, loop): 87 self._loop = loop 88 self._callback = callback 89 self._args = args 90 self._cancelled = False 91 self._repr = None 92 if self._loop.get_debug(): 93 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 94 else: 95 self._source_traceback = None 96 97 def _repr_info(self): 98 info = [self.__class__.__name__] 99 if self._cancelled: 100 info.append('cancelled') 101 if self._callback is not None: 102 info.append(_format_callback_source(self._callback, self._args)) 103 if self._source_traceback: 104 frame = self._source_traceback[-1] 105 info.append('created at %s:%s' % (frame[0], frame[1])) 106 return info 107 108 def __repr__(self): 109 if self._repr is not None: 110 return self._repr 111 info = self._repr_info() 112 return '<%s>' % ' '.join(info) 113 114 def cancel(self): 115 if not self._cancelled: 116 self._cancelled = True 117 if self._loop.get_debug(): 118 # Keep a representation in debug mode to keep callback and 119 # parameters. For example, to log the warning 120 # "Executing <Handle...> took 2.5 second" 121 self._repr = repr(self) 122 self._callback = None 123 self._args = None 124 125 def _run(self): 126 try: 127 self._callback(*self._args) 128 except Exception as exc: 129 cb = _format_callback_source(self._callback, self._args) 130 msg = 'Exception in callback {}'.format(cb) 131 context = { 132 'message': msg, 133 'exception': exc, 134 'handle': self, 135 } 136 if self._source_traceback: 137 context['source_traceback'] = self._source_traceback 138 self._loop.call_exception_handler(context) 139 self = None # Needed to break cycles when an exception occurs. 140 141 142class TimerHandle(Handle): 143 """Object returned by timed callback registration methods.""" 144 145 __slots__ = ['_scheduled', '_when'] 146 147 def __init__(self, when, callback, args, loop): 148 assert when is not None 149 super().__init__(callback, args, loop) 150 if self._source_traceback: 151 del self._source_traceback[-1] 152 self._when = when 153 self._scheduled = False 154 155 def _repr_info(self): 156 info = super()._repr_info() 157 pos = 2 if self._cancelled else 1 158 info.insert(pos, 'when=%s' % self._when) 159 return info 160 161 def __hash__(self): 162 return hash(self._when) 163 164 def __lt__(self, other): 165 return self._when < other._when 166 167 def __le__(self, other): 168 if self._when < other._when: 169 return True 170 return self.__eq__(other) 171 172 def __gt__(self, other): 173 return self._when > other._when 174 175 def __ge__(self, other): 176 if self._when > other._when: 177 return True 178 return self.__eq__(other) 179 180 def __eq__(self, other): 181 if isinstance(other, TimerHandle): 182 return (self._when == other._when and 183 self._callback == other._callback and 184 self._args == other._args and 185 self._cancelled == other._cancelled) 186 return NotImplemented 187 188 def __ne__(self, other): 189 equal = self.__eq__(other) 190 return NotImplemented if equal is NotImplemented else not equal 191 192 def cancel(self): 193 if not self._cancelled: 194 self._loop._timer_handle_cancelled(self) 195 super().cancel() 196 197 198class AbstractServer: 199 """Abstract server returned by create_server().""" 200 201 def close(self): 202 """Stop serving. This leaves existing connections open.""" 203 return NotImplemented 204 205 def wait_closed(self): 206 """Coroutine to wait until service is closed.""" 207 return NotImplemented 208 209 210class AbstractEventLoop: 211 """Abstract event loop.""" 212 213 # Running and stopping the event loop. 214 215 def run_forever(self): 216 """Run the event loop until stop() is called.""" 217 raise NotImplementedError 218 219 def run_until_complete(self, future): 220 """Run the event loop until a Future is done. 221 222 Return the Future's result, or raise its exception. 223 """ 224 raise NotImplementedError 225 226 def stop(self): 227 """Stop the event loop as soon as reasonable. 228 229 Exactly how soon that is may depend on the implementation, but 230 no more I/O callbacks should be scheduled. 231 """ 232 raise NotImplementedError 233 234 def is_running(self): 235 """Return whether the event loop is currently running.""" 236 raise NotImplementedError 237 238 def is_closed(self): 239 """Returns True if the event loop was closed.""" 240 raise NotImplementedError 241 242 def close(self): 243 """Close the loop. 244 245 The loop should not be running. 246 247 This is idempotent and irreversible. 248 249 No other methods should be called after this one. 250 """ 251 raise NotImplementedError 252 253 def shutdown_asyncgens(self): 254 """Shutdown all active asynchronous generators.""" 255 raise NotImplementedError 256 257 # Methods scheduling callbacks. All these return Handles. 258 259 def _timer_handle_cancelled(self, handle): 260 """Notification that a TimerHandle has been cancelled.""" 261 raise NotImplementedError 262 263 def call_soon(self, callback, *args): 264 return self.call_later(0, callback, *args) 265 266 def call_later(self, delay, callback, *args): 267 raise NotImplementedError 268 269 def call_at(self, when, callback, *args): 270 raise NotImplementedError 271 272 def time(self): 273 raise NotImplementedError 274 275 def create_future(self): 276 raise NotImplementedError 277 278 # Method scheduling a coroutine object: create a task. 279 280 def create_task(self, coro): 281 raise NotImplementedError 282 283 # Methods for interacting with threads. 284 285 def call_soon_threadsafe(self, callback, *args): 286 raise NotImplementedError 287 288 def run_in_executor(self, executor, func, *args): 289 raise NotImplementedError 290 291 def set_default_executor(self, executor): 292 raise NotImplementedError 293 294 # Network I/O methods returning Futures. 295 296 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): 297 raise NotImplementedError 298 299 def getnameinfo(self, sockaddr, flags=0): 300 raise NotImplementedError 301 302 def create_connection(self, protocol_factory, host=None, port=None, *, 303 ssl=None, family=0, proto=0, flags=0, sock=None, 304 local_addr=None, server_hostname=None): 305 raise NotImplementedError 306 307 def create_server(self, protocol_factory, host=None, port=None, *, 308 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, 309 sock=None, backlog=100, ssl=None, reuse_address=None, 310 reuse_port=None): 311 """A coroutine which creates a TCP server bound to host and port. 312 313 The return value is a Server object which can be used to stop 314 the service. 315 316 If host is an empty string or None all interfaces are assumed 317 and a list of multiple sockets will be returned (most likely 318 one for IPv4 and another one for IPv6). The host parameter can also be a 319 sequence (e.g. list) of hosts to bind to. 320 321 family can be set to either AF_INET or AF_INET6 to force the 322 socket to use IPv4 or IPv6. If not set it will be determined 323 from host (defaults to AF_UNSPEC). 324 325 flags is a bitmask for getaddrinfo(). 326 327 sock can optionally be specified in order to use a preexisting 328 socket object. 329 330 backlog is the maximum number of queued connections passed to 331 listen() (defaults to 100). 332 333 ssl can be set to an SSLContext to enable SSL over the 334 accepted connections. 335 336 reuse_address tells the kernel to reuse a local socket in 337 TIME_WAIT state, without waiting for its natural timeout to 338 expire. If not specified will automatically be set to True on 339 UNIX. 340 341 reuse_port tells the kernel to allow this endpoint to be bound to 342 the same port as other existing endpoints are bound to, so long as 343 they all set this flag when being created. This option is not 344 supported on Windows. 345 """ 346 raise NotImplementedError 347 348 def create_unix_connection(self, protocol_factory, path, *, 349 ssl=None, sock=None, 350 server_hostname=None): 351 raise NotImplementedError 352 353 def create_unix_server(self, protocol_factory, path, *, 354 sock=None, backlog=100, ssl=None): 355 """A coroutine which creates a UNIX Domain Socket server. 356 357 The return value is a Server object, which can be used to stop 358 the service. 359 360 path is a str, representing a file systsem path to bind the 361 server socket to. 362 363 sock can optionally be specified in order to use a preexisting 364 socket object. 365 366 backlog is the maximum number of queued connections passed to 367 listen() (defaults to 100). 368 369 ssl can be set to an SSLContext to enable SSL over the 370 accepted connections. 371 """ 372 raise NotImplementedError 373 374 def create_datagram_endpoint(self, protocol_factory, 375 local_addr=None, remote_addr=None, *, 376 family=0, proto=0, flags=0, 377 reuse_address=None, reuse_port=None, 378 allow_broadcast=None, sock=None): 379 """A coroutine which creates a datagram endpoint. 380 381 This method will try to establish the endpoint in the background. 382 When successful, the coroutine returns a (transport, protocol) pair. 383 384 protocol_factory must be a callable returning a protocol instance. 385 386 socket family AF_INET or socket.AF_INET6 depending on host (or 387 family if specified), socket type SOCK_DGRAM. 388 389 reuse_address tells the kernel to reuse a local socket in 390 TIME_WAIT state, without waiting for its natural timeout to 391 expire. If not specified it will automatically be set to True on 392 UNIX. 393 394 reuse_port tells the kernel to allow this endpoint to be bound to 395 the same port as other existing endpoints are bound to, so long as 396 they all set this flag when being created. This option is not 397 supported on Windows and some UNIX's. If the 398 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 399 capability is unsupported. 400 401 allow_broadcast tells the kernel to allow this endpoint to send 402 messages to the broadcast address. 403 404 sock can optionally be specified in order to use a preexisting 405 socket object. 406 """ 407 raise NotImplementedError 408 409 # Pipes and subprocesses. 410 411 def connect_read_pipe(self, protocol_factory, pipe): 412 """Register read pipe in event loop. Set the pipe to non-blocking mode. 413 414 protocol_factory should instantiate object with Protocol interface. 415 pipe is a file-like object. 416 Return pair (transport, protocol), where transport supports the 417 ReadTransport interface.""" 418 # The reason to accept file-like object instead of just file descriptor 419 # is: we need to own pipe and close it at transport finishing 420 # Can got complicated errors if pass f.fileno(), 421 # close fd in pipe transport then close f and vise versa. 422 raise NotImplementedError 423 424 def connect_write_pipe(self, protocol_factory, pipe): 425 """Register write pipe in event loop. 426 427 protocol_factory should instantiate object with BaseProtocol interface. 428 Pipe is file-like object already switched to nonblocking. 429 Return pair (transport, protocol), where transport support 430 WriteTransport interface.""" 431 # The reason to accept file-like object instead of just file descriptor 432 # is: we need to own pipe and close it at transport finishing 433 # Can got complicated errors if pass f.fileno(), 434 # close fd in pipe transport then close f and vise versa. 435 raise NotImplementedError 436 437 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, 438 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 439 **kwargs): 440 raise NotImplementedError 441 442 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, 443 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 444 **kwargs): 445 raise NotImplementedError 446 447 # Ready-based callback registration methods. 448 # The add_*() methods return None. 449 # The remove_*() methods return True if something was removed, 450 # False if there was nothing to delete. 451 452 def add_reader(self, fd, callback, *args): 453 raise NotImplementedError 454 455 def remove_reader(self, fd): 456 raise NotImplementedError 457 458 def add_writer(self, fd, callback, *args): 459 raise NotImplementedError 460 461 def remove_writer(self, fd): 462 raise NotImplementedError 463 464 # Completion based I/O methods returning Futures. 465 466 def sock_recv(self, sock, nbytes): 467 raise NotImplementedError 468 469 def sock_sendall(self, sock, data): 470 raise NotImplementedError 471 472 def sock_connect(self, sock, address): 473 raise NotImplementedError 474 475 def sock_accept(self, sock): 476 raise NotImplementedError 477 478 # Signal handling. 479 480 def add_signal_handler(self, sig, callback, *args): 481 raise NotImplementedError 482 483 def remove_signal_handler(self, sig): 484 raise NotImplementedError 485 486 # Task factory. 487 488 def set_task_factory(self, factory): 489 raise NotImplementedError 490 491 def get_task_factory(self): 492 raise NotImplementedError 493 494 # Error handlers. 495 496 def get_exception_handler(self): 497 raise NotImplementedError 498 499 def set_exception_handler(self, handler): 500 raise NotImplementedError 501 502 def default_exception_handler(self, context): 503 raise NotImplementedError 504 505 def call_exception_handler(self, context): 506 raise NotImplementedError 507 508 # Debug flag management. 509 510 def get_debug(self): 511 raise NotImplementedError 512 513 def set_debug(self, enabled): 514 raise NotImplementedError 515 516 517class AbstractEventLoopPolicy: 518 """Abstract policy for accessing the event loop.""" 519 520 def get_event_loop(self): 521 """Get the event loop for the current context. 522 523 Returns an event loop object implementing the BaseEventLoop interface, 524 or raises an exception in case no event loop has been set for the 525 current context and the current policy does not specify to create one. 526 527 It should never return None.""" 528 raise NotImplementedError 529 530 def set_event_loop(self, loop): 531 """Set the event loop for the current context to loop.""" 532 raise NotImplementedError 533 534 def new_event_loop(self): 535 """Create and return a new event loop object according to this 536 policy's rules. If there's need to set this loop as the event loop for 537 the current context, set_event_loop must be called explicitly.""" 538 raise NotImplementedError 539 540 # Child processes handling (Unix only). 541 542 def get_child_watcher(self): 543 "Get the watcher for child processes." 544 raise NotImplementedError 545 546 def set_child_watcher(self, watcher): 547 """Set the watcher for child processes.""" 548 raise NotImplementedError 549 550 551class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): 552 """Default policy implementation for accessing the event loop. 553 554 In this policy, each thread has its own event loop. However, we 555 only automatically create an event loop by default for the main 556 thread; other threads by default have no event loop. 557 558 Other policies may have different rules (e.g. a single global 559 event loop, or automatically creating an event loop per thread, or 560 using some other notion of context to which an event loop is 561 associated). 562 """ 563 564 _loop_factory = None 565 566 class _Local(threading.local): 567 _loop = None 568 _set_called = False 569 570 def __init__(self): 571 self._local = self._Local() 572 573 def get_event_loop(self): 574 """Get the event loop. 575 576 This may be None or an instance of EventLoop. 577 """ 578 if (self._local._loop is None and 579 not self._local._set_called and 580 isinstance(threading.current_thread(), threading._MainThread)): 581 self.set_event_loop(self.new_event_loop()) 582 if self._local._loop is None: 583 raise RuntimeError('There is no current event loop in thread %r.' 584 % threading.current_thread().name) 585 return self._local._loop 586 587 def set_event_loop(self, loop): 588 """Set the event loop.""" 589 self._local._set_called = True 590 assert loop is None or isinstance(loop, AbstractEventLoop) 591 self._local._loop = loop 592 593 def new_event_loop(self): 594 """Create a new event loop. 595 596 You must call set_event_loop() to make this the current event 597 loop. 598 """ 599 return self._loop_factory() 600 601 602# Event loop policy. The policy itself is always global, even if the 603# policy's rules say that there is an event loop per thread (or other 604# notion of context). The default policy is installed by the first 605# call to get_event_loop_policy(). 606_event_loop_policy = None 607 608# Lock for protecting the on-the-fly creation of the event loop policy. 609_lock = threading.Lock() 610 611 612# A TLS for the running event loop, used by _get_running_loop. 613class _RunningLoop(threading.local): 614 _loop = None 615 _pid = None 616 617 618_running_loop = _RunningLoop() 619 620 621def _get_running_loop(): 622 """Return the running event loop or None. 623 624 This is a low-level function intended to be used by event loops. 625 This function is thread-specific. 626 """ 627 running_loop = _running_loop._loop 628 if running_loop is not None and _running_loop._pid == os.getpid(): 629 return running_loop 630 631 632def _set_running_loop(loop): 633 """Set the running event loop. 634 635 This is a low-level function intended to be used by event loops. 636 This function is thread-specific. 637 """ 638 _running_loop._pid = os.getpid() 639 _running_loop._loop = loop 640 641 642def _init_event_loop_policy(): 643 global _event_loop_policy 644 with _lock: 645 if _event_loop_policy is None: # pragma: no branch 646 from . import DefaultEventLoopPolicy 647 _event_loop_policy = DefaultEventLoopPolicy() 648 649 650def get_event_loop_policy(): 651 """Get the current event loop policy.""" 652 if _event_loop_policy is None: 653 _init_event_loop_policy() 654 return _event_loop_policy 655 656 657def set_event_loop_policy(policy): 658 """Set the current event loop policy. 659 660 If policy is None, the default policy is restored.""" 661 global _event_loop_policy 662 assert policy is None or isinstance(policy, AbstractEventLoopPolicy) 663 _event_loop_policy = policy 664 665 666def get_event_loop(): 667 """Return an asyncio event loop. 668 669 When called from a coroutine or a callback (e.g. scheduled with call_soon 670 or similar API), this function will always return the running event loop. 671 672 If there is no running event loop set, the function will return 673 the result of `get_event_loop_policy().get_event_loop()` call. 674 """ 675 current_loop = _get_running_loop() 676 if current_loop is not None: 677 return current_loop 678 return get_event_loop_policy().get_event_loop() 679 680 681def set_event_loop(loop): 682 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 683 get_event_loop_policy().set_event_loop(loop) 684 685 686def new_event_loop(): 687 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 688 return get_event_loop_policy().new_event_loop() 689 690 691def get_child_watcher(): 692 """Equivalent to calling get_event_loop_policy().get_child_watcher().""" 693 return get_event_loop_policy().get_child_watcher() 694 695 696def set_child_watcher(watcher): 697 """Equivalent to calling 698 get_event_loop_policy().set_child_watcher(watcher).""" 699 return get_event_loop_policy().set_child_watcher(watcher) 700