1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4__author__ = 'Brian Quinlan (brian@sweetapp.com)' 5 6import collections 7import logging 8import threading 9import time 10import types 11 12FIRST_COMPLETED = 'FIRST_COMPLETED' 13FIRST_EXCEPTION = 'FIRST_EXCEPTION' 14ALL_COMPLETED = 'ALL_COMPLETED' 15_AS_COMPLETED = '_AS_COMPLETED' 16 17# Possible future states (for internal use by the futures package). 18PENDING = 'PENDING' 19RUNNING = 'RUNNING' 20# The future was cancelled by the user... 21CANCELLED = 'CANCELLED' 22# ...and _Waiter.add_cancelled() was called by a worker. 23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 24FINISHED = 'FINISHED' 25 26_FUTURE_STATES = [ 27 PENDING, 28 RUNNING, 29 CANCELLED, 30 CANCELLED_AND_NOTIFIED, 31 FINISHED 32] 33 34_STATE_TO_DESCRIPTION_MAP = { 35 PENDING: "pending", 36 RUNNING: "running", 37 CANCELLED: "cancelled", 38 CANCELLED_AND_NOTIFIED: "cancelled", 39 FINISHED: "finished" 40} 41 42# Logger for internal use by the futures package. 43LOGGER = logging.getLogger("concurrent.futures") 44 45class Error(Exception): 46 """Base class for all future-related exceptions.""" 47 pass 48 49class CancelledError(Error): 50 """The Future was cancelled.""" 51 pass 52 53class TimeoutError(Error): 54 """The operation exceeded the given deadline.""" 55 pass 56 57class InvalidStateError(Error): 58 """The operation is not allowed in this state.""" 59 pass 60 61class _Waiter(object): 62 """Provides the event that wait() and as_completed() block on.""" 63 def __init__(self): 64 self.event = threading.Event() 65 self.finished_futures = [] 66 67 def add_result(self, future): 68 self.finished_futures.append(future) 69 70 def add_exception(self, future): 71 self.finished_futures.append(future) 72 73 def add_cancelled(self, future): 74 self.finished_futures.append(future) 75 76class _AsCompletedWaiter(_Waiter): 77 """Used by as_completed().""" 78 79 def __init__(self): 80 super(_AsCompletedWaiter, self).__init__() 81 self.lock = threading.Lock() 82 83 def add_result(self, future): 84 with self.lock: 85 super(_AsCompletedWaiter, self).add_result(future) 86 self.event.set() 87 88 def add_exception(self, future): 89 with self.lock: 90 super(_AsCompletedWaiter, self).add_exception(future) 91 self.event.set() 92 93 def add_cancelled(self, future): 94 with self.lock: 95 super(_AsCompletedWaiter, self).add_cancelled(future) 96 self.event.set() 97 98class _FirstCompletedWaiter(_Waiter): 99 """Used by wait(return_when=FIRST_COMPLETED).""" 100 101 def add_result(self, future): 102 super().add_result(future) 103 self.event.set() 104 105 def add_exception(self, future): 106 super().add_exception(future) 107 self.event.set() 108 109 def add_cancelled(self, future): 110 super().add_cancelled(future) 111 self.event.set() 112 113class _AllCompletedWaiter(_Waiter): 114 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 115 116 def __init__(self, num_pending_calls, stop_on_exception): 117 self.num_pending_calls = num_pending_calls 118 self.stop_on_exception = stop_on_exception 119 self.lock = threading.Lock() 120 super().__init__() 121 122 def _decrement_pending_calls(self): 123 with self.lock: 124 self.num_pending_calls -= 1 125 if not self.num_pending_calls: 126 self.event.set() 127 128 def add_result(self, future): 129 super().add_result(future) 130 self._decrement_pending_calls() 131 132 def add_exception(self, future): 133 super().add_exception(future) 134 if self.stop_on_exception: 135 self.event.set() 136 else: 137 self._decrement_pending_calls() 138 139 def add_cancelled(self, future): 140 super().add_cancelled(future) 141 self._decrement_pending_calls() 142 143class _AcquireFutures(object): 144 """A context manager that does an ordered acquire of Future conditions.""" 145 146 def __init__(self, futures): 147 self.futures = sorted(futures, key=id) 148 149 def __enter__(self): 150 for future in self.futures: 151 future._condition.acquire() 152 153 def __exit__(self, *args): 154 for future in self.futures: 155 future._condition.release() 156 157def _create_and_install_waiters(fs, return_when): 158 if return_when == _AS_COMPLETED: 159 waiter = _AsCompletedWaiter() 160 elif return_when == FIRST_COMPLETED: 161 waiter = _FirstCompletedWaiter() 162 else: 163 pending_count = sum( 164 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 165 166 if return_when == FIRST_EXCEPTION: 167 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 168 elif return_when == ALL_COMPLETED: 169 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 170 else: 171 raise ValueError("Invalid return condition: %r" % return_when) 172 173 for f in fs: 174 f._waiters.append(waiter) 175 176 return waiter 177 178 179def _yield_finished_futures(fs, waiter, ref_collect): 180 """ 181 Iterate on the list *fs*, yielding finished futures one by one in 182 reverse order. 183 Before yielding a future, *waiter* is removed from its waiters 184 and the future is removed from each set in the collection of sets 185 *ref_collect*. 186 187 The aim of this function is to avoid keeping stale references after 188 the future is yielded and before the iterator resumes. 189 """ 190 while fs: 191 f = fs[-1] 192 for futures_set in ref_collect: 193 futures_set.remove(f) 194 with f._condition: 195 f._waiters.remove(waiter) 196 del f 197 # Careful not to keep a reference to the popped value 198 yield fs.pop() 199 200 201def as_completed(fs, timeout=None): 202 """An iterator over the given futures that yields each as it completes. 203 204 Args: 205 fs: The sequence of Futures (possibly created by different Executors) to 206 iterate over. 207 timeout: The maximum number of seconds to wait. If None, then there 208 is no limit on the wait time. 209 210 Returns: 211 An iterator that yields the given Futures as they complete (finished or 212 cancelled). If any given Futures are duplicated, they will be returned 213 once. 214 215 Raises: 216 TimeoutError: If the entire result iterator could not be generated 217 before the given timeout. 218 """ 219 if timeout is not None: 220 end_time = timeout + time.monotonic() 221 222 fs = set(fs) 223 total_futures = len(fs) 224 with _AcquireFutures(fs): 225 finished = set( 226 f for f in fs 227 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 228 pending = fs - finished 229 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 230 finished = list(finished) 231 try: 232 yield from _yield_finished_futures(finished, waiter, 233 ref_collect=(fs,)) 234 235 while pending: 236 if timeout is None: 237 wait_timeout = None 238 else: 239 wait_timeout = end_time - time.monotonic() 240 if wait_timeout < 0: 241 raise TimeoutError( 242 '%d (of %d) futures unfinished' % ( 243 len(pending), total_futures)) 244 245 waiter.event.wait(wait_timeout) 246 247 with waiter.lock: 248 finished = waiter.finished_futures 249 waiter.finished_futures = [] 250 waiter.event.clear() 251 252 # reverse to keep finishing order 253 finished.reverse() 254 yield from _yield_finished_futures(finished, waiter, 255 ref_collect=(fs, pending)) 256 257 finally: 258 # Remove waiter from unfinished futures 259 for f in fs: 260 with f._condition: 261 f._waiters.remove(waiter) 262 263DoneAndNotDoneFutures = collections.namedtuple( 264 'DoneAndNotDoneFutures', 'done not_done') 265def wait(fs, timeout=None, return_when=ALL_COMPLETED): 266 """Wait for the futures in the given sequence to complete. 267 268 Args: 269 fs: The sequence of Futures (possibly created by different Executors) to 270 wait upon. 271 timeout: The maximum number of seconds to wait. If None, then there 272 is no limit on the wait time. 273 return_when: Indicates when this function should return. The options 274 are: 275 276 FIRST_COMPLETED - Return when any future finishes or is 277 cancelled. 278 FIRST_EXCEPTION - Return when any future finishes by raising an 279 exception. If no future raises an exception 280 then it is equivalent to ALL_COMPLETED. 281 ALL_COMPLETED - Return when all futures finish or are cancelled. 282 283 Returns: 284 A named 2-tuple of sets. The first set, named 'done', contains the 285 futures that completed (is finished or cancelled) before the wait 286 completed. The second set, named 'not_done', contains uncompleted 287 futures. Duplicate futures given to *fs* are removed and will be 288 returned only once. 289 """ 290 fs = set(fs) 291 with _AcquireFutures(fs): 292 done = {f for f in fs 293 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]} 294 not_done = fs - done 295 if (return_when == FIRST_COMPLETED) and done: 296 return DoneAndNotDoneFutures(done, not_done) 297 elif (return_when == FIRST_EXCEPTION) and done: 298 if any(f for f in done 299 if not f.cancelled() and f.exception() is not None): 300 return DoneAndNotDoneFutures(done, not_done) 301 302 if len(done) == len(fs): 303 return DoneAndNotDoneFutures(done, not_done) 304 305 waiter = _create_and_install_waiters(fs, return_when) 306 307 waiter.event.wait(timeout) 308 for f in fs: 309 with f._condition: 310 f._waiters.remove(waiter) 311 312 done.update(waiter.finished_futures) 313 return DoneAndNotDoneFutures(done, fs - done) 314 315class Future(object): 316 """Represents the result of an asynchronous computation.""" 317 318 def __init__(self): 319 """Initializes the future. Should not be called by clients.""" 320 self._condition = threading.Condition() 321 self._state = PENDING 322 self._result = None 323 self._exception = None 324 self._waiters = [] 325 self._done_callbacks = [] 326 327 def _invoke_callbacks(self): 328 for callback in self._done_callbacks: 329 try: 330 callback(self) 331 except Exception: 332 LOGGER.exception('exception calling callback for %r', self) 333 334 def __repr__(self): 335 with self._condition: 336 if self._state == FINISHED: 337 if self._exception: 338 return '<%s at %#x state=%s raised %s>' % ( 339 self.__class__.__name__, 340 id(self), 341 _STATE_TO_DESCRIPTION_MAP[self._state], 342 self._exception.__class__.__name__) 343 else: 344 return '<%s at %#x state=%s returned %s>' % ( 345 self.__class__.__name__, 346 id(self), 347 _STATE_TO_DESCRIPTION_MAP[self._state], 348 self._result.__class__.__name__) 349 return '<%s at %#x state=%s>' % ( 350 self.__class__.__name__, 351 id(self), 352 _STATE_TO_DESCRIPTION_MAP[self._state]) 353 354 def cancel(self): 355 """Cancel the future if possible. 356 357 Returns True if the future was cancelled, False otherwise. A future 358 cannot be cancelled if it is running or has already completed. 359 """ 360 with self._condition: 361 if self._state in [RUNNING, FINISHED]: 362 return False 363 364 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 365 return True 366 367 self._state = CANCELLED 368 self._condition.notify_all() 369 370 self._invoke_callbacks() 371 return True 372 373 def cancelled(self): 374 """Return True if the future was cancelled.""" 375 with self._condition: 376 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 377 378 def running(self): 379 """Return True if the future is currently executing.""" 380 with self._condition: 381 return self._state == RUNNING 382 383 def done(self): 384 """Return True of the future was cancelled or finished executing.""" 385 with self._condition: 386 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 387 388 def __get_result(self): 389 if self._exception: 390 try: 391 raise self._exception 392 finally: 393 # Break a reference cycle with the exception in self._exception 394 self = None 395 else: 396 return self._result 397 398 def add_done_callback(self, fn): 399 """Attaches a callable that will be called when the future finishes. 400 401 Args: 402 fn: A callable that will be called with this future as its only 403 argument when the future completes or is cancelled. The callable 404 will always be called by a thread in the same process in which 405 it was added. If the future has already completed or been 406 cancelled then the callable will be called immediately. These 407 callables are called in the order that they were added. 408 """ 409 with self._condition: 410 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 411 self._done_callbacks.append(fn) 412 return 413 try: 414 fn(self) 415 except Exception: 416 LOGGER.exception('exception calling callback for %r', self) 417 418 def result(self, timeout=None): 419 """Return the result of the call that the future represents. 420 421 Args: 422 timeout: The number of seconds to wait for the result if the future 423 isn't done. If None, then there is no limit on the wait time. 424 425 Returns: 426 The result of the call that the future represents. 427 428 Raises: 429 CancelledError: If the future was cancelled. 430 TimeoutError: If the future didn't finish executing before the given 431 timeout. 432 Exception: If the call raised then that exception will be raised. 433 """ 434 try: 435 with self._condition: 436 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 437 raise CancelledError() 438 elif self._state == FINISHED: 439 return self.__get_result() 440 441 self._condition.wait(timeout) 442 443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 444 raise CancelledError() 445 elif self._state == FINISHED: 446 return self.__get_result() 447 else: 448 raise TimeoutError() 449 finally: 450 # Break a reference cycle with the exception in self._exception 451 self = None 452 453 def exception(self, timeout=None): 454 """Return the exception raised by the call that the future represents. 455 456 Args: 457 timeout: The number of seconds to wait for the exception if the 458 future isn't done. If None, then there is no limit on the wait 459 time. 460 461 Returns: 462 The exception raised by the call that the future represents or None 463 if the call completed without raising. 464 465 Raises: 466 CancelledError: If the future was cancelled. 467 TimeoutError: If the future didn't finish executing before the given 468 timeout. 469 """ 470 471 with self._condition: 472 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 473 raise CancelledError() 474 elif self._state == FINISHED: 475 return self._exception 476 477 self._condition.wait(timeout) 478 479 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 480 raise CancelledError() 481 elif self._state == FINISHED: 482 return self._exception 483 else: 484 raise TimeoutError() 485 486 # The following methods should only be used by Executors and in tests. 487 def set_running_or_notify_cancel(self): 488 """Mark the future as running or process any cancel notifications. 489 490 Should only be used by Executor implementations and unit tests. 491 492 If the future has been cancelled (cancel() was called and returned 493 True) then any threads waiting on the future completing (though calls 494 to as_completed() or wait()) are notified and False is returned. 495 496 If the future was not cancelled then it is put in the running state 497 (future calls to running() will return True) and True is returned. 498 499 This method should be called by Executor implementations before 500 executing the work associated with this future. If this method returns 501 False then the work should not be executed. 502 503 Returns: 504 False if the Future was cancelled, True otherwise. 505 506 Raises: 507 RuntimeError: if this method was already called or if set_result() 508 or set_exception() was called. 509 """ 510 with self._condition: 511 if self._state == CANCELLED: 512 self._state = CANCELLED_AND_NOTIFIED 513 for waiter in self._waiters: 514 waiter.add_cancelled(self) 515 # self._condition.notify_all() is not necessary because 516 # self.cancel() triggers a notification. 517 return False 518 elif self._state == PENDING: 519 self._state = RUNNING 520 return True 521 else: 522 LOGGER.critical('Future %s in unexpected state: %s', 523 id(self), 524 self._state) 525 raise RuntimeError('Future in unexpected state') 526 527 def set_result(self, result): 528 """Sets the return value of work associated with the future. 529 530 Should only be used by Executor implementations and unit tests. 531 """ 532 with self._condition: 533 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 534 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 535 self._result = result 536 self._state = FINISHED 537 for waiter in self._waiters: 538 waiter.add_result(self) 539 self._condition.notify_all() 540 self._invoke_callbacks() 541 542 def set_exception(self, exception): 543 """Sets the result of the future as being the given exception. 544 545 Should only be used by Executor implementations and unit tests. 546 """ 547 with self._condition: 548 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 549 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 550 self._exception = exception 551 self._state = FINISHED 552 for waiter in self._waiters: 553 waiter.add_exception(self) 554 self._condition.notify_all() 555 self._invoke_callbacks() 556 557 __class_getitem__ = classmethod(types.GenericAlias) 558 559class Executor(object): 560 """This is an abstract base class for concrete asynchronous executors.""" 561 562 def submit(self, fn, /, *args, **kwargs): 563 """Submits a callable to be executed with the given arguments. 564 565 Schedules the callable to be executed as fn(*args, **kwargs) and returns 566 a Future instance representing the execution of the callable. 567 568 Returns: 569 A Future representing the given call. 570 """ 571 raise NotImplementedError() 572 573 def map(self, fn, *iterables, timeout=None, chunksize=1): 574 """Returns an iterator equivalent to map(fn, iter). 575 576 Args: 577 fn: A callable that will take as many arguments as there are 578 passed iterables. 579 timeout: The maximum number of seconds to wait. If None, then there 580 is no limit on the wait time. 581 chunksize: The size of the chunks the iterable will be broken into 582 before being passed to a child process. This argument is only 583 used by ProcessPoolExecutor; it is ignored by 584 ThreadPoolExecutor. 585 586 Returns: 587 An iterator equivalent to: map(func, *iterables) but the calls may 588 be evaluated out-of-order. 589 590 Raises: 591 TimeoutError: If the entire result iterator could not be generated 592 before the given timeout. 593 Exception: If fn(*args) raises for any values. 594 """ 595 if timeout is not None: 596 end_time = timeout + time.monotonic() 597 598 fs = [self.submit(fn, *args) for args in zip(*iterables)] 599 600 # Yield must be hidden in closure so that the futures are submitted 601 # before the first iterator value is required. 602 def result_iterator(): 603 try: 604 # reverse to keep finishing order 605 fs.reverse() 606 while fs: 607 # Careful not to keep a reference to the popped future 608 if timeout is None: 609 yield fs.pop().result() 610 else: 611 yield fs.pop().result(end_time - time.monotonic()) 612 finally: 613 for future in fs: 614 future.cancel() 615 return result_iterator() 616 617 def shutdown(self, wait=True, *, cancel_futures=False): 618 """Clean-up the resources associated with the Executor. 619 620 It is safe to call this method several times. Otherwise, no other 621 methods can be called after this one. 622 623 Args: 624 wait: If True then shutdown will not return until all running 625 futures have finished executing and the resources used by the 626 executor have been reclaimed. 627 cancel_futures: If True then shutdown will cancel all pending 628 futures. Futures that are completed or running will not be 629 cancelled. 630 """ 631 pass 632 633 def __enter__(self): 634 return self 635 636 def __exit__(self, exc_type, exc_val, exc_tb): 637 self.shutdown(wait=True) 638 return False 639 640 641class BrokenExecutor(RuntimeError): 642 """ 643 Raised when a executor has become non-functional after a severe failure. 644 """ 645