1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4__author__ = 'Brian Quinlan (brian@sweetapp.com)' 5 6import collections 7import logging 8import threading 9import time 10 11FIRST_COMPLETED = 'FIRST_COMPLETED' 12FIRST_EXCEPTION = 'FIRST_EXCEPTION' 13ALL_COMPLETED = 'ALL_COMPLETED' 14_AS_COMPLETED = '_AS_COMPLETED' 15 16# Possible future states (for internal use by the futures package). 17PENDING = 'PENDING' 18RUNNING = 'RUNNING' 19# The future was cancelled by the user... 20CANCELLED = 'CANCELLED' 21# ...and _Waiter.add_cancelled() was called by a worker. 22CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 23FINISHED = 'FINISHED' 24 25_FUTURE_STATES = [ 26 PENDING, 27 RUNNING, 28 CANCELLED, 29 CANCELLED_AND_NOTIFIED, 30 FINISHED 31] 32 33_STATE_TO_DESCRIPTION_MAP = { 34 PENDING: "pending", 35 RUNNING: "running", 36 CANCELLED: "cancelled", 37 CANCELLED_AND_NOTIFIED: "cancelled", 38 FINISHED: "finished" 39} 40 41# Logger for internal use by the futures package. 42LOGGER = logging.getLogger("concurrent.futures") 43 44class Error(Exception): 45 """Base class for all future-related exceptions.""" 46 pass 47 48class CancelledError(Error): 49 """The Future was cancelled.""" 50 pass 51 52class TimeoutError(Error): 53 """The operation exceeded the given deadline.""" 54 pass 55 56class _Waiter(object): 57 """Provides the event that wait() and as_completed() block on.""" 58 def __init__(self): 59 self.event = threading.Event() 60 self.finished_futures = [] 61 62 def add_result(self, future): 63 self.finished_futures.append(future) 64 65 def add_exception(self, future): 66 self.finished_futures.append(future) 67 68 def add_cancelled(self, future): 69 self.finished_futures.append(future) 70 71class _AsCompletedWaiter(_Waiter): 72 """Used by as_completed().""" 73 74 def __init__(self): 75 super(_AsCompletedWaiter, self).__init__() 76 self.lock = threading.Lock() 77 78 def add_result(self, future): 79 with self.lock: 80 super(_AsCompletedWaiter, self).add_result(future) 81 self.event.set() 82 83 def add_exception(self, future): 84 with self.lock: 85 super(_AsCompletedWaiter, self).add_exception(future) 86 self.event.set() 87 88 def add_cancelled(self, future): 89 with self.lock: 90 super(_AsCompletedWaiter, self).add_cancelled(future) 91 self.event.set() 92 93class _FirstCompletedWaiter(_Waiter): 94 """Used by wait(return_when=FIRST_COMPLETED).""" 95 96 def add_result(self, future): 97 super().add_result(future) 98 self.event.set() 99 100 def add_exception(self, future): 101 super().add_exception(future) 102 self.event.set() 103 104 def add_cancelled(self, future): 105 super().add_cancelled(future) 106 self.event.set() 107 108class _AllCompletedWaiter(_Waiter): 109 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 110 111 def __init__(self, num_pending_calls, stop_on_exception): 112 self.num_pending_calls = num_pending_calls 113 self.stop_on_exception = stop_on_exception 114 self.lock = threading.Lock() 115 super().__init__() 116 117 def _decrement_pending_calls(self): 118 with self.lock: 119 self.num_pending_calls -= 1 120 if not self.num_pending_calls: 121 self.event.set() 122 123 def add_result(self, future): 124 super().add_result(future) 125 self._decrement_pending_calls() 126 127 def add_exception(self, future): 128 super().add_exception(future) 129 if self.stop_on_exception: 130 self.event.set() 131 else: 132 self._decrement_pending_calls() 133 134 def add_cancelled(self, future): 135 super().add_cancelled(future) 136 self._decrement_pending_calls() 137 138class _AcquireFutures(object): 139 """A context manager that does an ordered acquire of Future conditions.""" 140 141 def __init__(self, futures): 142 self.futures = sorted(futures, key=id) 143 144 def __enter__(self): 145 for future in self.futures: 146 future._condition.acquire() 147 148 def __exit__(self, *args): 149 for future in self.futures: 150 future._condition.release() 151 152def _create_and_install_waiters(fs, return_when): 153 if return_when == _AS_COMPLETED: 154 waiter = _AsCompletedWaiter() 155 elif return_when == FIRST_COMPLETED: 156 waiter = _FirstCompletedWaiter() 157 else: 158 pending_count = sum( 159 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 160 161 if return_when == FIRST_EXCEPTION: 162 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 163 elif return_when == ALL_COMPLETED: 164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 165 else: 166 raise ValueError("Invalid return condition: %r" % return_when) 167 168 for f in fs: 169 f._waiters.append(waiter) 170 171 return waiter 172 173 174def _yield_finished_futures(fs, waiter, ref_collect): 175 """ 176 Iterate on the list *fs*, yielding finished futures one by one in 177 reverse order. 178 Before yielding a future, *waiter* is removed from its waiters 179 and the future is removed from each set in the collection of sets 180 *ref_collect*. 181 182 The aim of this function is to avoid keeping stale references after 183 the future is yielded and before the iterator resumes. 184 """ 185 while fs: 186 f = fs[-1] 187 for futures_set in ref_collect: 188 futures_set.remove(f) 189 with f._condition: 190 f._waiters.remove(waiter) 191 del f 192 # Careful not to keep a reference to the popped value 193 yield fs.pop() 194 195 196def as_completed(fs, timeout=None): 197 """An iterator over the given futures that yields each as it completes. 198 199 Args: 200 fs: The sequence of Futures (possibly created by different Executors) to 201 iterate over. 202 timeout: The maximum number of seconds to wait. If None, then there 203 is no limit on the wait time. 204 205 Returns: 206 An iterator that yields the given Futures as they complete (finished or 207 cancelled). If any given Futures are duplicated, they will be returned 208 once. 209 210 Raises: 211 TimeoutError: If the entire result iterator could not be generated 212 before the given timeout. 213 """ 214 if timeout is not None: 215 end_time = timeout + time.monotonic() 216 217 fs = set(fs) 218 total_futures = len(fs) 219 with _AcquireFutures(fs): 220 finished = set( 221 f for f in fs 222 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 223 pending = fs - finished 224 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 225 finished = list(finished) 226 try: 227 yield from _yield_finished_futures(finished, waiter, 228 ref_collect=(fs,)) 229 230 while pending: 231 if timeout is None: 232 wait_timeout = None 233 else: 234 wait_timeout = end_time - time.monotonic() 235 if wait_timeout < 0: 236 raise TimeoutError( 237 '%d (of %d) futures unfinished' % ( 238 len(pending), total_futures)) 239 240 waiter.event.wait(wait_timeout) 241 242 with waiter.lock: 243 finished = waiter.finished_futures 244 waiter.finished_futures = [] 245 waiter.event.clear() 246 247 # reverse to keep finishing order 248 finished.reverse() 249 yield from _yield_finished_futures(finished, waiter, 250 ref_collect=(fs, pending)) 251 252 finally: 253 # Remove waiter from unfinished futures 254 for f in fs: 255 with f._condition: 256 f._waiters.remove(waiter) 257 258DoneAndNotDoneFutures = collections.namedtuple( 259 'DoneAndNotDoneFutures', 'done not_done') 260def wait(fs, timeout=None, return_when=ALL_COMPLETED): 261 """Wait for the futures in the given sequence to complete. 262 263 Args: 264 fs: The sequence of Futures (possibly created by different Executors) to 265 wait upon. 266 timeout: The maximum number of seconds to wait. If None, then there 267 is no limit on the wait time. 268 return_when: Indicates when this function should return. The options 269 are: 270 271 FIRST_COMPLETED - Return when any future finishes or is 272 cancelled. 273 FIRST_EXCEPTION - Return when any future finishes by raising an 274 exception. If no future raises an exception 275 then it is equivalent to ALL_COMPLETED. 276 ALL_COMPLETED - Return when all futures finish or are cancelled. 277 278 Returns: 279 A named 2-tuple of sets. The first set, named 'done', contains the 280 futures that completed (is finished or cancelled) before the wait 281 completed. The second set, named 'not_done', contains uncompleted 282 futures. 283 """ 284 with _AcquireFutures(fs): 285 done = set(f for f in fs 286 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 287 not_done = set(fs) - done 288 289 if (return_when == FIRST_COMPLETED) and done: 290 return DoneAndNotDoneFutures(done, not_done) 291 elif (return_when == FIRST_EXCEPTION) and done: 292 if any(f for f in done 293 if not f.cancelled() and f.exception() is not None): 294 return DoneAndNotDoneFutures(done, not_done) 295 296 if len(done) == len(fs): 297 return DoneAndNotDoneFutures(done, not_done) 298 299 waiter = _create_and_install_waiters(fs, return_when) 300 301 waiter.event.wait(timeout) 302 for f in fs: 303 with f._condition: 304 f._waiters.remove(waiter) 305 306 done.update(waiter.finished_futures) 307 return DoneAndNotDoneFutures(done, set(fs) - done) 308 309class Future(object): 310 """Represents the result of an asynchronous computation.""" 311 312 def __init__(self): 313 """Initializes the future. Should not be called by clients.""" 314 self._condition = threading.Condition() 315 self._state = PENDING 316 self._result = None 317 self._exception = None 318 self._waiters = [] 319 self._done_callbacks = [] 320 321 def _invoke_callbacks(self): 322 for callback in self._done_callbacks: 323 try: 324 callback(self) 325 except Exception: 326 LOGGER.exception('exception calling callback for %r', self) 327 328 def __repr__(self): 329 with self._condition: 330 if self._state == FINISHED: 331 if self._exception: 332 return '<%s at %#x state=%s raised %s>' % ( 333 self.__class__.__name__, 334 id(self), 335 _STATE_TO_DESCRIPTION_MAP[self._state], 336 self._exception.__class__.__name__) 337 else: 338 return '<%s at %#x state=%s returned %s>' % ( 339 self.__class__.__name__, 340 id(self), 341 _STATE_TO_DESCRIPTION_MAP[self._state], 342 self._result.__class__.__name__) 343 return '<%s at %#x state=%s>' % ( 344 self.__class__.__name__, 345 id(self), 346 _STATE_TO_DESCRIPTION_MAP[self._state]) 347 348 def cancel(self): 349 """Cancel the future if possible. 350 351 Returns True if the future was cancelled, False otherwise. A future 352 cannot be cancelled if it is running or has already completed. 353 """ 354 with self._condition: 355 if self._state in [RUNNING, FINISHED]: 356 return False 357 358 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 359 return True 360 361 self._state = CANCELLED 362 self._condition.notify_all() 363 364 self._invoke_callbacks() 365 return True 366 367 def cancelled(self): 368 """Return True if the future was cancelled.""" 369 with self._condition: 370 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 371 372 def running(self): 373 """Return True if the future is currently executing.""" 374 with self._condition: 375 return self._state == RUNNING 376 377 def done(self): 378 """Return True of the future was cancelled or finished executing.""" 379 with self._condition: 380 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 381 382 def __get_result(self): 383 if self._exception: 384 raise self._exception 385 else: 386 return self._result 387 388 def add_done_callback(self, fn): 389 """Attaches a callable that will be called when the future finishes. 390 391 Args: 392 fn: A callable that will be called with this future as its only 393 argument when the future completes or is cancelled. The callable 394 will always be called by a thread in the same process in which 395 it was added. If the future has already completed or been 396 cancelled then the callable will be called immediately. These 397 callables are called in the order that they were added. 398 """ 399 with self._condition: 400 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 401 self._done_callbacks.append(fn) 402 return 403 fn(self) 404 405 def result(self, timeout=None): 406 """Return the result of the call that the future represents. 407 408 Args: 409 timeout: The number of seconds to wait for the result if the future 410 isn't done. If None, then there is no limit on the wait time. 411 412 Returns: 413 The result of the call that the future represents. 414 415 Raises: 416 CancelledError: If the future was cancelled. 417 TimeoutError: If the future didn't finish executing before the given 418 timeout. 419 Exception: If the call raised then that exception will be raised. 420 """ 421 with self._condition: 422 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 423 raise CancelledError() 424 elif self._state == FINISHED: 425 return self.__get_result() 426 427 self._condition.wait(timeout) 428 429 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 430 raise CancelledError() 431 elif self._state == FINISHED: 432 return self.__get_result() 433 else: 434 raise TimeoutError() 435 436 def exception(self, timeout=None): 437 """Return the exception raised by the call that the future represents. 438 439 Args: 440 timeout: The number of seconds to wait for the exception if the 441 future isn't done. If None, then there is no limit on the wait 442 time. 443 444 Returns: 445 The exception raised by the call that the future represents or None 446 if the call completed without raising. 447 448 Raises: 449 CancelledError: If the future was cancelled. 450 TimeoutError: If the future didn't finish executing before the given 451 timeout. 452 """ 453 454 with self._condition: 455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 456 raise CancelledError() 457 elif self._state == FINISHED: 458 return self._exception 459 460 self._condition.wait(timeout) 461 462 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 463 raise CancelledError() 464 elif self._state == FINISHED: 465 return self._exception 466 else: 467 raise TimeoutError() 468 469 # The following methods should only be used by Executors and in tests. 470 def set_running_or_notify_cancel(self): 471 """Mark the future as running or process any cancel notifications. 472 473 Should only be used by Executor implementations and unit tests. 474 475 If the future has been cancelled (cancel() was called and returned 476 True) then any threads waiting on the future completing (though calls 477 to as_completed() or wait()) are notified and False is returned. 478 479 If the future was not cancelled then it is put in the running state 480 (future calls to running() will return True) and True is returned. 481 482 This method should be called by Executor implementations before 483 executing the work associated with this future. If this method returns 484 False then the work should not be executed. 485 486 Returns: 487 False if the Future was cancelled, True otherwise. 488 489 Raises: 490 RuntimeError: if this method was already called or if set_result() 491 or set_exception() was called. 492 """ 493 with self._condition: 494 if self._state == CANCELLED: 495 self._state = CANCELLED_AND_NOTIFIED 496 for waiter in self._waiters: 497 waiter.add_cancelled(self) 498 # self._condition.notify_all() is not necessary because 499 # self.cancel() triggers a notification. 500 return False 501 elif self._state == PENDING: 502 self._state = RUNNING 503 return True 504 else: 505 LOGGER.critical('Future %s in unexpected state: %s', 506 id(self), 507 self._state) 508 raise RuntimeError('Future in unexpected state') 509 510 def set_result(self, result): 511 """Sets the return value of work associated with the future. 512 513 Should only be used by Executor implementations and unit tests. 514 """ 515 with self._condition: 516 self._result = result 517 self._state = FINISHED 518 for waiter in self._waiters: 519 waiter.add_result(self) 520 self._condition.notify_all() 521 self._invoke_callbacks() 522 523 def set_exception(self, exception): 524 """Sets the result of the future as being the given exception. 525 526 Should only be used by Executor implementations and unit tests. 527 """ 528 with self._condition: 529 self._exception = exception 530 self._state = FINISHED 531 for waiter in self._waiters: 532 waiter.add_exception(self) 533 self._condition.notify_all() 534 self._invoke_callbacks() 535 536class Executor(object): 537 """This is an abstract base class for concrete asynchronous executors.""" 538 539 def submit(self, fn, *args, **kwargs): 540 """Submits a callable to be executed with the given arguments. 541 542 Schedules the callable to be executed as fn(*args, **kwargs) and returns 543 a Future instance representing the execution of the callable. 544 545 Returns: 546 A Future representing the given call. 547 """ 548 raise NotImplementedError() 549 550 def map(self, fn, *iterables, timeout=None, chunksize=1): 551 """Returns an iterator equivalent to map(fn, iter). 552 553 Args: 554 fn: A callable that will take as many arguments as there are 555 passed iterables. 556 timeout: The maximum number of seconds to wait. If None, then there 557 is no limit on the wait time. 558 chunksize: The size of the chunks the iterable will be broken into 559 before being passed to a child process. This argument is only 560 used by ProcessPoolExecutor; it is ignored by 561 ThreadPoolExecutor. 562 563 Returns: 564 An iterator equivalent to: map(func, *iterables) but the calls may 565 be evaluated out-of-order. 566 567 Raises: 568 TimeoutError: If the entire result iterator could not be generated 569 before the given timeout. 570 Exception: If fn(*args) raises for any values. 571 """ 572 if timeout is not None: 573 end_time = timeout + time.monotonic() 574 575 fs = [self.submit(fn, *args) for args in zip(*iterables)] 576 577 # Yield must be hidden in closure so that the futures are submitted 578 # before the first iterator value is required. 579 def result_iterator(): 580 try: 581 # reverse to keep finishing order 582 fs.reverse() 583 while fs: 584 # Careful not to keep a reference to the popped future 585 if timeout is None: 586 yield fs.pop().result() 587 else: 588 yield fs.pop().result(end_time - time.monotonic()) 589 finally: 590 for future in fs: 591 future.cancel() 592 return result_iterator() 593 594 def shutdown(self, wait=True): 595 """Clean-up the resources associated with the Executor. 596 597 It is safe to call this method several times. Otherwise, no other 598 methods can be called after this one. 599 600 Args: 601 wait: If True then shutdown will not return until all running 602 futures have finished executing and the resources used by the 603 executor have been reclaimed. 604 """ 605 pass 606 607 def __enter__(self): 608 return self 609 610 def __exit__(self, exc_type, exc_val, exc_tb): 611 self.shutdown(wait=True) 612 return False 613 614 615class BrokenExecutor(RuntimeError): 616 """ 617 Raised when a executor has become non-functional after a severe failure. 618 """ 619