1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4__author__ = 'Brian Quinlan (brian@sweetapp.com)' 5 6import collections 7import logging 8import threading 9import time 10import types 11 12FIRST_COMPLETED = 'FIRST_COMPLETED' 13FIRST_EXCEPTION = 'FIRST_EXCEPTION' 14ALL_COMPLETED = 'ALL_COMPLETED' 15_AS_COMPLETED = '_AS_COMPLETED' 16 17# Possible future states (for internal use by the futures package). 18PENDING = 'PENDING' 19RUNNING = 'RUNNING' 20# The future was cancelled by the user... 21CANCELLED = 'CANCELLED' 22# ...and _Waiter.add_cancelled() was called by a worker. 23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 24FINISHED = 'FINISHED' 25 26_FUTURE_STATES = [ 27 PENDING, 28 RUNNING, 29 CANCELLED, 30 CANCELLED_AND_NOTIFIED, 31 FINISHED 32] 33 34_STATE_TO_DESCRIPTION_MAP = { 35 PENDING: "pending", 36 RUNNING: "running", 37 CANCELLED: "cancelled", 38 CANCELLED_AND_NOTIFIED: "cancelled", 39 FINISHED: "finished" 40} 41 42# Logger for internal use by the futures package. 43LOGGER = logging.getLogger("concurrent.futures") 44 45class Error(Exception): 46 """Base class for all future-related exceptions.""" 47 pass 48 49class CancelledError(Error): 50 """The Future was cancelled.""" 51 pass 52 53class TimeoutError(Error): 54 """The operation exceeded the given deadline.""" 55 pass 56 57class InvalidStateError(Error): 58 """The operation is not allowed in this state.""" 59 pass 60 61class _Waiter(object): 62 """Provides the event that wait() and as_completed() block on.""" 63 def __init__(self): 64 self.event = threading.Event() 65 self.finished_futures = [] 66 67 def add_result(self, future): 68 self.finished_futures.append(future) 69 70 def add_exception(self, future): 71 self.finished_futures.append(future) 72 73 def add_cancelled(self, future): 74 self.finished_futures.append(future) 75 76class _AsCompletedWaiter(_Waiter): 77 """Used by as_completed().""" 78 79 def __init__(self): 80 super(_AsCompletedWaiter, self).__init__() 81 self.lock = threading.Lock() 82 83 def add_result(self, future): 84 with self.lock: 85 super(_AsCompletedWaiter, self).add_result(future) 86 self.event.set() 87 88 def add_exception(self, future): 89 with self.lock: 90 super(_AsCompletedWaiter, self).add_exception(future) 91 self.event.set() 92 93 def add_cancelled(self, future): 94 with self.lock: 95 super(_AsCompletedWaiter, self).add_cancelled(future) 96 self.event.set() 97 98class _FirstCompletedWaiter(_Waiter): 99 """Used by wait(return_when=FIRST_COMPLETED).""" 100 101 def add_result(self, future): 102 super().add_result(future) 103 self.event.set() 104 105 def add_exception(self, future): 106 super().add_exception(future) 107 self.event.set() 108 109 def add_cancelled(self, future): 110 super().add_cancelled(future) 111 self.event.set() 112 113class _AllCompletedWaiter(_Waiter): 114 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 115 116 def __init__(self, num_pending_calls, stop_on_exception): 117 self.num_pending_calls = num_pending_calls 118 self.stop_on_exception = stop_on_exception 119 self.lock = threading.Lock() 120 super().__init__() 121 122 def _decrement_pending_calls(self): 123 with self.lock: 124 self.num_pending_calls -= 1 125 if not self.num_pending_calls: 126 self.event.set() 127 128 def add_result(self, future): 129 super().add_result(future) 130 self._decrement_pending_calls() 131 132 def add_exception(self, future): 133 super().add_exception(future) 134 if self.stop_on_exception: 135 self.event.set() 136 else: 137 self._decrement_pending_calls() 138 139 def add_cancelled(self, future): 140 super().add_cancelled(future) 141 self._decrement_pending_calls() 142 143class _AcquireFutures(object): 144 """A context manager that does an ordered acquire of Future conditions.""" 145 146 def __init__(self, futures): 147 self.futures = sorted(futures, key=id) 148 149 def __enter__(self): 150 for future in self.futures: 151 future._condition.acquire() 152 153 def __exit__(self, *args): 154 for future in self.futures: 155 future._condition.release() 156 157def _create_and_install_waiters(fs, return_when): 158 if return_when == _AS_COMPLETED: 159 waiter = _AsCompletedWaiter() 160 elif return_when == FIRST_COMPLETED: 161 waiter = _FirstCompletedWaiter() 162 else: 163 pending_count = sum( 164 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 165 166 if return_when == FIRST_EXCEPTION: 167 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 168 elif return_when == ALL_COMPLETED: 169 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 170 else: 171 raise ValueError("Invalid return condition: %r" % return_when) 172 173 for f in fs: 174 f._waiters.append(waiter) 175 176 return waiter 177 178 179def _yield_finished_futures(fs, waiter, ref_collect): 180 """ 181 Iterate on the list *fs*, yielding finished futures one by one in 182 reverse order. 183 Before yielding a future, *waiter* is removed from its waiters 184 and the future is removed from each set in the collection of sets 185 *ref_collect*. 186 187 The aim of this function is to avoid keeping stale references after 188 the future is yielded and before the iterator resumes. 189 """ 190 while fs: 191 f = fs[-1] 192 for futures_set in ref_collect: 193 futures_set.remove(f) 194 with f._condition: 195 f._waiters.remove(waiter) 196 del f 197 # Careful not to keep a reference to the popped value 198 yield fs.pop() 199 200 201def as_completed(fs, timeout=None): 202 """An iterator over the given futures that yields each as it completes. 203 204 Args: 205 fs: The sequence of Futures (possibly created by different Executors) to 206 iterate over. 207 timeout: The maximum number of seconds to wait. If None, then there 208 is no limit on the wait time. 209 210 Returns: 211 An iterator that yields the given Futures as they complete (finished or 212 cancelled). If any given Futures are duplicated, they will be returned 213 once. 214 215 Raises: 216 TimeoutError: If the entire result iterator could not be generated 217 before the given timeout. 218 """ 219 if timeout is not None: 220 end_time = timeout + time.monotonic() 221 222 fs = set(fs) 223 total_futures = len(fs) 224 with _AcquireFutures(fs): 225 finished = set( 226 f for f in fs 227 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 228 pending = fs - finished 229 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 230 finished = list(finished) 231 try: 232 yield from _yield_finished_futures(finished, waiter, 233 ref_collect=(fs,)) 234 235 while pending: 236 if timeout is None: 237 wait_timeout = None 238 else: 239 wait_timeout = end_time - time.monotonic() 240 if wait_timeout < 0: 241 raise TimeoutError( 242 '%d (of %d) futures unfinished' % ( 243 len(pending), total_futures)) 244 245 waiter.event.wait(wait_timeout) 246 247 with waiter.lock: 248 finished = waiter.finished_futures 249 waiter.finished_futures = [] 250 waiter.event.clear() 251 252 # reverse to keep finishing order 253 finished.reverse() 254 yield from _yield_finished_futures(finished, waiter, 255 ref_collect=(fs, pending)) 256 257 finally: 258 # Remove waiter from unfinished futures 259 for f in fs: 260 with f._condition: 261 f._waiters.remove(waiter) 262 263DoneAndNotDoneFutures = collections.namedtuple( 264 'DoneAndNotDoneFutures', 'done not_done') 265def wait(fs, timeout=None, return_when=ALL_COMPLETED): 266 """Wait for the futures in the given sequence to complete. 267 268 Args: 269 fs: The sequence of Futures (possibly created by different Executors) to 270 wait upon. 271 timeout: The maximum number of seconds to wait. If None, then there 272 is no limit on the wait time. 273 return_when: Indicates when this function should return. The options 274 are: 275 276 FIRST_COMPLETED - Return when any future finishes or is 277 cancelled. 278 FIRST_EXCEPTION - Return when any future finishes by raising an 279 exception. If no future raises an exception 280 then it is equivalent to ALL_COMPLETED. 281 ALL_COMPLETED - Return when all futures finish or are cancelled. 282 283 Returns: 284 A named 2-tuple of sets. The first set, named 'done', contains the 285 futures that completed (is finished or cancelled) before the wait 286 completed. The second set, named 'not_done', contains uncompleted 287 futures. 288 """ 289 with _AcquireFutures(fs): 290 done = set(f for f in fs 291 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 292 not_done = set(fs) - done 293 294 if (return_when == FIRST_COMPLETED) and done: 295 return DoneAndNotDoneFutures(done, not_done) 296 elif (return_when == FIRST_EXCEPTION) and done: 297 if any(f for f in done 298 if not f.cancelled() and f.exception() is not None): 299 return DoneAndNotDoneFutures(done, not_done) 300 301 if len(done) == len(fs): 302 return DoneAndNotDoneFutures(done, not_done) 303 304 waiter = _create_and_install_waiters(fs, return_when) 305 306 waiter.event.wait(timeout) 307 for f in fs: 308 with f._condition: 309 f._waiters.remove(waiter) 310 311 done.update(waiter.finished_futures) 312 return DoneAndNotDoneFutures(done, set(fs) - done) 313 314class Future(object): 315 """Represents the result of an asynchronous computation.""" 316 317 def __init__(self): 318 """Initializes the future. Should not be called by clients.""" 319 self._condition = threading.Condition() 320 self._state = PENDING 321 self._result = None 322 self._exception = None 323 self._waiters = [] 324 self._done_callbacks = [] 325 326 def _invoke_callbacks(self): 327 for callback in self._done_callbacks: 328 try: 329 callback(self) 330 except Exception: 331 LOGGER.exception('exception calling callback for %r', self) 332 333 def __repr__(self): 334 with self._condition: 335 if self._state == FINISHED: 336 if self._exception: 337 return '<%s at %#x state=%s raised %s>' % ( 338 self.__class__.__name__, 339 id(self), 340 _STATE_TO_DESCRIPTION_MAP[self._state], 341 self._exception.__class__.__name__) 342 else: 343 return '<%s at %#x state=%s returned %s>' % ( 344 self.__class__.__name__, 345 id(self), 346 _STATE_TO_DESCRIPTION_MAP[self._state], 347 self._result.__class__.__name__) 348 return '<%s at %#x state=%s>' % ( 349 self.__class__.__name__, 350 id(self), 351 _STATE_TO_DESCRIPTION_MAP[self._state]) 352 353 def cancel(self): 354 """Cancel the future if possible. 355 356 Returns True if the future was cancelled, False otherwise. A future 357 cannot be cancelled if it is running or has already completed. 358 """ 359 with self._condition: 360 if self._state in [RUNNING, FINISHED]: 361 return False 362 363 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 364 return True 365 366 self._state = CANCELLED 367 self._condition.notify_all() 368 369 self._invoke_callbacks() 370 return True 371 372 def cancelled(self): 373 """Return True if the future was cancelled.""" 374 with self._condition: 375 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 376 377 def running(self): 378 """Return True if the future is currently executing.""" 379 with self._condition: 380 return self._state == RUNNING 381 382 def done(self): 383 """Return True of the future was cancelled or finished executing.""" 384 with self._condition: 385 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 386 387 def __get_result(self): 388 if self._exception: 389 raise self._exception 390 else: 391 return self._result 392 393 def add_done_callback(self, fn): 394 """Attaches a callable that will be called when the future finishes. 395 396 Args: 397 fn: A callable that will be called with this future as its only 398 argument when the future completes or is cancelled. The callable 399 will always be called by a thread in the same process in which 400 it was added. If the future has already completed or been 401 cancelled then the callable will be called immediately. These 402 callables are called in the order that they were added. 403 """ 404 with self._condition: 405 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 406 self._done_callbacks.append(fn) 407 return 408 try: 409 fn(self) 410 except Exception: 411 LOGGER.exception('exception calling callback for %r', self) 412 413 def result(self, timeout=None): 414 """Return the result of the call that the future represents. 415 416 Args: 417 timeout: The number of seconds to wait for the result if the future 418 isn't done. If None, then there is no limit on the wait time. 419 420 Returns: 421 The result of the call that the future represents. 422 423 Raises: 424 CancelledError: If the future was cancelled. 425 TimeoutError: If the future didn't finish executing before the given 426 timeout. 427 Exception: If the call raised then that exception will be raised. 428 """ 429 with self._condition: 430 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 431 raise CancelledError() 432 elif self._state == FINISHED: 433 return self.__get_result() 434 435 self._condition.wait(timeout) 436 437 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 438 raise CancelledError() 439 elif self._state == FINISHED: 440 return self.__get_result() 441 else: 442 raise TimeoutError() 443 444 def exception(self, timeout=None): 445 """Return the exception raised by the call that the future represents. 446 447 Args: 448 timeout: The number of seconds to wait for the exception if the 449 future isn't done. If None, then there is no limit on the wait 450 time. 451 452 Returns: 453 The exception raised by the call that the future represents or None 454 if the call completed without raising. 455 456 Raises: 457 CancelledError: If the future was cancelled. 458 TimeoutError: If the future didn't finish executing before the given 459 timeout. 460 """ 461 462 with self._condition: 463 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 464 raise CancelledError() 465 elif self._state == FINISHED: 466 return self._exception 467 468 self._condition.wait(timeout) 469 470 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 471 raise CancelledError() 472 elif self._state == FINISHED: 473 return self._exception 474 else: 475 raise TimeoutError() 476 477 # The following methods should only be used by Executors and in tests. 478 def set_running_or_notify_cancel(self): 479 """Mark the future as running or process any cancel notifications. 480 481 Should only be used by Executor implementations and unit tests. 482 483 If the future has been cancelled (cancel() was called and returned 484 True) then any threads waiting on the future completing (though calls 485 to as_completed() or wait()) are notified and False is returned. 486 487 If the future was not cancelled then it is put in the running state 488 (future calls to running() will return True) and True is returned. 489 490 This method should be called by Executor implementations before 491 executing the work associated with this future. If this method returns 492 False then the work should not be executed. 493 494 Returns: 495 False if the Future was cancelled, True otherwise. 496 497 Raises: 498 RuntimeError: if this method was already called or if set_result() 499 or set_exception() was called. 500 """ 501 with self._condition: 502 if self._state == CANCELLED: 503 self._state = CANCELLED_AND_NOTIFIED 504 for waiter in self._waiters: 505 waiter.add_cancelled(self) 506 # self._condition.notify_all() is not necessary because 507 # self.cancel() triggers a notification. 508 return False 509 elif self._state == PENDING: 510 self._state = RUNNING 511 return True 512 else: 513 LOGGER.critical('Future %s in unexpected state: %s', 514 id(self), 515 self._state) 516 raise RuntimeError('Future in unexpected state') 517 518 def set_result(self, result): 519 """Sets the return value of work associated with the future. 520 521 Should only be used by Executor implementations and unit tests. 522 """ 523 with self._condition: 524 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 525 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 526 self._result = result 527 self._state = FINISHED 528 for waiter in self._waiters: 529 waiter.add_result(self) 530 self._condition.notify_all() 531 self._invoke_callbacks() 532 533 def set_exception(self, exception): 534 """Sets the result of the future as being the given exception. 535 536 Should only be used by Executor implementations and unit tests. 537 """ 538 with self._condition: 539 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 540 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 541 self._exception = exception 542 self._state = FINISHED 543 for waiter in self._waiters: 544 waiter.add_exception(self) 545 self._condition.notify_all() 546 self._invoke_callbacks() 547 548 __class_getitem__ = classmethod(types.GenericAlias) 549 550class Executor(object): 551 """This is an abstract base class for concrete asynchronous executors.""" 552 553 def submit(self, fn, /, *args, **kwargs): 554 """Submits a callable to be executed with the given arguments. 555 556 Schedules the callable to be executed as fn(*args, **kwargs) and returns 557 a Future instance representing the execution of the callable. 558 559 Returns: 560 A Future representing the given call. 561 """ 562 raise NotImplementedError() 563 564 def map(self, fn, *iterables, timeout=None, chunksize=1): 565 """Returns an iterator equivalent to map(fn, iter). 566 567 Args: 568 fn: A callable that will take as many arguments as there are 569 passed iterables. 570 timeout: The maximum number of seconds to wait. If None, then there 571 is no limit on the wait time. 572 chunksize: The size of the chunks the iterable will be broken into 573 before being passed to a child process. This argument is only 574 used by ProcessPoolExecutor; it is ignored by 575 ThreadPoolExecutor. 576 577 Returns: 578 An iterator equivalent to: map(func, *iterables) but the calls may 579 be evaluated out-of-order. 580 581 Raises: 582 TimeoutError: If the entire result iterator could not be generated 583 before the given timeout. 584 Exception: If fn(*args) raises for any values. 585 """ 586 if timeout is not None: 587 end_time = timeout + time.monotonic() 588 589 fs = [self.submit(fn, *args) for args in zip(*iterables)] 590 591 # Yield must be hidden in closure so that the futures are submitted 592 # before the first iterator value is required. 593 def result_iterator(): 594 try: 595 # reverse to keep finishing order 596 fs.reverse() 597 while fs: 598 # Careful not to keep a reference to the popped future 599 if timeout is None: 600 yield fs.pop().result() 601 else: 602 yield fs.pop().result(end_time - time.monotonic()) 603 finally: 604 for future in fs: 605 future.cancel() 606 return result_iterator() 607 608 def shutdown(self, wait=True, *, cancel_futures=False): 609 """Clean-up the resources associated with the Executor. 610 611 It is safe to call this method several times. Otherwise, no other 612 methods can be called after this one. 613 614 Args: 615 wait: If True then shutdown will not return until all running 616 futures have finished executing and the resources used by the 617 executor have been reclaimed. 618 cancel_futures: If True then shutdown will cancel all pending 619 futures. Futures that are completed or running will not be 620 cancelled. 621 """ 622 pass 623 624 def __enter__(self): 625 return self 626 627 def __exit__(self, exc_type, exc_val, exc_tb): 628 self.shutdown(wait=True) 629 return False 630 631 632class BrokenExecutor(RuntimeError): 633 """ 634 Raised when a executor has become non-functional after a severe failure. 635 """ 636