1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4__author__ = 'Brian Quinlan (brian@sweetapp.com)' 5 6import collections 7import logging 8import threading 9import time 10 11FIRST_COMPLETED = 'FIRST_COMPLETED' 12FIRST_EXCEPTION = 'FIRST_EXCEPTION' 13ALL_COMPLETED = 'ALL_COMPLETED' 14_AS_COMPLETED = '_AS_COMPLETED' 15 16# Possible future states (for internal use by the futures package). 17PENDING = 'PENDING' 18RUNNING = 'RUNNING' 19# The future was cancelled by the user... 20CANCELLED = 'CANCELLED' 21# ...and _Waiter.add_cancelled() was called by a worker. 22CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 23FINISHED = 'FINISHED' 24 25_FUTURE_STATES = [ 26 PENDING, 27 RUNNING, 28 CANCELLED, 29 CANCELLED_AND_NOTIFIED, 30 FINISHED 31] 32 33_STATE_TO_DESCRIPTION_MAP = { 34 PENDING: "pending", 35 RUNNING: "running", 36 CANCELLED: "cancelled", 37 CANCELLED_AND_NOTIFIED: "cancelled", 38 FINISHED: "finished" 39} 40 41# Logger for internal use by the futures package. 42LOGGER = logging.getLogger("concurrent.futures") 43 44class Error(Exception): 45 """Base class for all future-related exceptions.""" 46 pass 47 48class CancelledError(Error): 49 """The Future was cancelled.""" 50 pass 51 52class TimeoutError(Error): 53 """The operation exceeded the given deadline.""" 54 pass 55 56class _Waiter(object): 57 """Provides the event that wait() and as_completed() block on.""" 58 def __init__(self): 59 self.event = threading.Event() 60 self.finished_futures = [] 61 62 def add_result(self, future): 63 self.finished_futures.append(future) 64 65 def add_exception(self, future): 66 self.finished_futures.append(future) 67 68 def add_cancelled(self, future): 69 self.finished_futures.append(future) 70 71class _AsCompletedWaiter(_Waiter): 72 """Used by as_completed().""" 73 74 def __init__(self): 75 super(_AsCompletedWaiter, self).__init__() 76 self.lock = threading.Lock() 77 78 def add_result(self, future): 79 with self.lock: 80 super(_AsCompletedWaiter, self).add_result(future) 81 self.event.set() 82 83 def add_exception(self, future): 84 with self.lock: 85 super(_AsCompletedWaiter, self).add_exception(future) 86 self.event.set() 87 88 def add_cancelled(self, future): 89 with self.lock: 90 super(_AsCompletedWaiter, self).add_cancelled(future) 91 self.event.set() 92 93class _FirstCompletedWaiter(_Waiter): 94 """Used by wait(return_when=FIRST_COMPLETED).""" 95 96 def add_result(self, future): 97 super().add_result(future) 98 self.event.set() 99 100 def add_exception(self, future): 101 super().add_exception(future) 102 self.event.set() 103 104 def add_cancelled(self, future): 105 super().add_cancelled(future) 106 self.event.set() 107 108class _AllCompletedWaiter(_Waiter): 109 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 110 111 def __init__(self, num_pending_calls, stop_on_exception): 112 self.num_pending_calls = num_pending_calls 113 self.stop_on_exception = stop_on_exception 114 self.lock = threading.Lock() 115 super().__init__() 116 117 def _decrement_pending_calls(self): 118 with self.lock: 119 self.num_pending_calls -= 1 120 if not self.num_pending_calls: 121 self.event.set() 122 123 def add_result(self, future): 124 super().add_result(future) 125 self._decrement_pending_calls() 126 127 def add_exception(self, future): 128 super().add_exception(future) 129 if self.stop_on_exception: 130 self.event.set() 131 else: 132 self._decrement_pending_calls() 133 134 def add_cancelled(self, future): 135 super().add_cancelled(future) 136 self._decrement_pending_calls() 137 138class _AcquireFutures(object): 139 """A context manager that does an ordered acquire of Future conditions.""" 140 141 def __init__(self, futures): 142 self.futures = sorted(futures, key=id) 143 144 def __enter__(self): 145 for future in self.futures: 146 future._condition.acquire() 147 148 def __exit__(self, *args): 149 for future in self.futures: 150 future._condition.release() 151 152def _create_and_install_waiters(fs, return_when): 153 if return_when == _AS_COMPLETED: 154 waiter = _AsCompletedWaiter() 155 elif return_when == FIRST_COMPLETED: 156 waiter = _FirstCompletedWaiter() 157 else: 158 pending_count = sum( 159 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 160 161 if return_when == FIRST_EXCEPTION: 162 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 163 elif return_when == ALL_COMPLETED: 164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 165 else: 166 raise ValueError("Invalid return condition: %r" % return_when) 167 168 for f in fs: 169 f._waiters.append(waiter) 170 171 return waiter 172 173def as_completed(fs, timeout=None): 174 """An iterator over the given futures that yields each as it completes. 175 176 Args: 177 fs: The sequence of Futures (possibly created by different Executors) to 178 iterate over. 179 timeout: The maximum number of seconds to wait. If None, then there 180 is no limit on the wait time. 181 182 Returns: 183 An iterator that yields the given Futures as they complete (finished or 184 cancelled). If any given Futures are duplicated, they will be returned 185 once. 186 187 Raises: 188 TimeoutError: If the entire result iterator could not be generated 189 before the given timeout. 190 """ 191 if timeout is not None: 192 end_time = timeout + time.time() 193 194 fs = set(fs) 195 with _AcquireFutures(fs): 196 finished = set( 197 f for f in fs 198 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 199 pending = fs - finished 200 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 201 202 try: 203 yield from finished 204 205 while pending: 206 if timeout is None: 207 wait_timeout = None 208 else: 209 wait_timeout = end_time - time.time() 210 if wait_timeout < 0: 211 raise TimeoutError( 212 '%d (of %d) futures unfinished' % ( 213 len(pending), len(fs))) 214 215 waiter.event.wait(wait_timeout) 216 217 with waiter.lock: 218 finished = waiter.finished_futures 219 waiter.finished_futures = [] 220 waiter.event.clear() 221 222 for future in finished: 223 yield future 224 pending.remove(future) 225 226 finally: 227 for f in fs: 228 with f._condition: 229 f._waiters.remove(waiter) 230 231DoneAndNotDoneFutures = collections.namedtuple( 232 'DoneAndNotDoneFutures', 'done not_done') 233def wait(fs, timeout=None, return_when=ALL_COMPLETED): 234 """Wait for the futures in the given sequence to complete. 235 236 Args: 237 fs: The sequence of Futures (possibly created by different Executors) to 238 wait upon. 239 timeout: The maximum number of seconds to wait. If None, then there 240 is no limit on the wait time. 241 return_when: Indicates when this function should return. The options 242 are: 243 244 FIRST_COMPLETED - Return when any future finishes or is 245 cancelled. 246 FIRST_EXCEPTION - Return when any future finishes by raising an 247 exception. If no future raises an exception 248 then it is equivalent to ALL_COMPLETED. 249 ALL_COMPLETED - Return when all futures finish or are cancelled. 250 251 Returns: 252 A named 2-tuple of sets. The first set, named 'done', contains the 253 futures that completed (is finished or cancelled) before the wait 254 completed. The second set, named 'not_done', contains uncompleted 255 futures. 256 """ 257 with _AcquireFutures(fs): 258 done = set(f for f in fs 259 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 260 not_done = set(fs) - done 261 262 if (return_when == FIRST_COMPLETED) and done: 263 return DoneAndNotDoneFutures(done, not_done) 264 elif (return_when == FIRST_EXCEPTION) and done: 265 if any(f for f in done 266 if not f.cancelled() and f.exception() is not None): 267 return DoneAndNotDoneFutures(done, not_done) 268 269 if len(done) == len(fs): 270 return DoneAndNotDoneFutures(done, not_done) 271 272 waiter = _create_and_install_waiters(fs, return_when) 273 274 waiter.event.wait(timeout) 275 for f in fs: 276 with f._condition: 277 f._waiters.remove(waiter) 278 279 done.update(waiter.finished_futures) 280 return DoneAndNotDoneFutures(done, set(fs) - done) 281 282class Future(object): 283 """Represents the result of an asynchronous computation.""" 284 285 def __init__(self): 286 """Initializes the future. Should not be called by clients.""" 287 self._condition = threading.Condition() 288 self._state = PENDING 289 self._result = None 290 self._exception = None 291 self._waiters = [] 292 self._done_callbacks = [] 293 294 def _invoke_callbacks(self): 295 for callback in self._done_callbacks: 296 try: 297 callback(self) 298 except Exception: 299 LOGGER.exception('exception calling callback for %r', self) 300 301 def __repr__(self): 302 with self._condition: 303 if self._state == FINISHED: 304 if self._exception: 305 return '<%s at %#x state=%s raised %s>' % ( 306 self.__class__.__name__, 307 id(self), 308 _STATE_TO_DESCRIPTION_MAP[self._state], 309 self._exception.__class__.__name__) 310 else: 311 return '<%s at %#x state=%s returned %s>' % ( 312 self.__class__.__name__, 313 id(self), 314 _STATE_TO_DESCRIPTION_MAP[self._state], 315 self._result.__class__.__name__) 316 return '<%s at %#x state=%s>' % ( 317 self.__class__.__name__, 318 id(self), 319 _STATE_TO_DESCRIPTION_MAP[self._state]) 320 321 def cancel(self): 322 """Cancel the future if possible. 323 324 Returns True if the future was cancelled, False otherwise. A future 325 cannot be cancelled if it is running or has already completed. 326 """ 327 with self._condition: 328 if self._state in [RUNNING, FINISHED]: 329 return False 330 331 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 332 return True 333 334 self._state = CANCELLED 335 self._condition.notify_all() 336 337 self._invoke_callbacks() 338 return True 339 340 def cancelled(self): 341 """Return True if the future was cancelled.""" 342 with self._condition: 343 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 344 345 def running(self): 346 """Return True if the future is currently executing.""" 347 with self._condition: 348 return self._state == RUNNING 349 350 def done(self): 351 """Return True of the future was cancelled or finished executing.""" 352 with self._condition: 353 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 354 355 def __get_result(self): 356 if self._exception: 357 raise self._exception 358 else: 359 return self._result 360 361 def add_done_callback(self, fn): 362 """Attaches a callable that will be called when the future finishes. 363 364 Args: 365 fn: A callable that will be called with this future as its only 366 argument when the future completes or is cancelled. The callable 367 will always be called by a thread in the same process in which 368 it was added. If the future has already completed or been 369 cancelled then the callable will be called immediately. These 370 callables are called in the order that they were added. 371 """ 372 with self._condition: 373 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 374 self._done_callbacks.append(fn) 375 return 376 fn(self) 377 378 def result(self, timeout=None): 379 """Return the result of the call that the future represents. 380 381 Args: 382 timeout: The number of seconds to wait for the result if the future 383 isn't done. If None, then there is no limit on the wait time. 384 385 Returns: 386 The result of the call that the future represents. 387 388 Raises: 389 CancelledError: If the future was cancelled. 390 TimeoutError: If the future didn't finish executing before the given 391 timeout. 392 Exception: If the call raised then that exception will be raised. 393 """ 394 with self._condition: 395 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 396 raise CancelledError() 397 elif self._state == FINISHED: 398 return self.__get_result() 399 400 self._condition.wait(timeout) 401 402 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 403 raise CancelledError() 404 elif self._state == FINISHED: 405 return self.__get_result() 406 else: 407 raise TimeoutError() 408 409 def exception(self, timeout=None): 410 """Return the exception raised by the call that the future represents. 411 412 Args: 413 timeout: The number of seconds to wait for the exception if the 414 future isn't done. If None, then there is no limit on the wait 415 time. 416 417 Returns: 418 The exception raised by the call that the future represents or None 419 if the call completed without raising. 420 421 Raises: 422 CancelledError: If the future was cancelled. 423 TimeoutError: If the future didn't finish executing before the given 424 timeout. 425 """ 426 427 with self._condition: 428 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 429 raise CancelledError() 430 elif self._state == FINISHED: 431 return self._exception 432 433 self._condition.wait(timeout) 434 435 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 436 raise CancelledError() 437 elif self._state == FINISHED: 438 return self._exception 439 else: 440 raise TimeoutError() 441 442 # The following methods should only be used by Executors and in tests. 443 def set_running_or_notify_cancel(self): 444 """Mark the future as running or process any cancel notifications. 445 446 Should only be used by Executor implementations and unit tests. 447 448 If the future has been cancelled (cancel() was called and returned 449 True) then any threads waiting on the future completing (though calls 450 to as_completed() or wait()) are notified and False is returned. 451 452 If the future was not cancelled then it is put in the running state 453 (future calls to running() will return True) and True is returned. 454 455 This method should be called by Executor implementations before 456 executing the work associated with this future. If this method returns 457 False then the work should not be executed. 458 459 Returns: 460 False if the Future was cancelled, True otherwise. 461 462 Raises: 463 RuntimeError: if this method was already called or if set_result() 464 or set_exception() was called. 465 """ 466 with self._condition: 467 if self._state == CANCELLED: 468 self._state = CANCELLED_AND_NOTIFIED 469 for waiter in self._waiters: 470 waiter.add_cancelled(self) 471 # self._condition.notify_all() is not necessary because 472 # self.cancel() triggers a notification. 473 return False 474 elif self._state == PENDING: 475 self._state = RUNNING 476 return True 477 else: 478 LOGGER.critical('Future %s in unexpected state: %s', 479 id(self), 480 self._state) 481 raise RuntimeError('Future in unexpected state') 482 483 def set_result(self, result): 484 """Sets the return value of work associated with the future. 485 486 Should only be used by Executor implementations and unit tests. 487 """ 488 with self._condition: 489 self._result = result 490 self._state = FINISHED 491 for waiter in self._waiters: 492 waiter.add_result(self) 493 self._condition.notify_all() 494 self._invoke_callbacks() 495 496 def set_exception(self, exception): 497 """Sets the result of the future as being the given exception. 498 499 Should only be used by Executor implementations and unit tests. 500 """ 501 with self._condition: 502 self._exception = exception 503 self._state = FINISHED 504 for waiter in self._waiters: 505 waiter.add_exception(self) 506 self._condition.notify_all() 507 self._invoke_callbacks() 508 509class Executor(object): 510 """This is an abstract base class for concrete asynchronous executors.""" 511 512 def submit(self, fn, *args, **kwargs): 513 """Submits a callable to be executed with the given arguments. 514 515 Schedules the callable to be executed as fn(*args, **kwargs) and returns 516 a Future instance representing the execution of the callable. 517 518 Returns: 519 A Future representing the given call. 520 """ 521 raise NotImplementedError() 522 523 def map(self, fn, *iterables, timeout=None, chunksize=1): 524 """Returns an iterator equivalent to map(fn, iter). 525 526 Args: 527 fn: A callable that will take as many arguments as there are 528 passed iterables. 529 timeout: The maximum number of seconds to wait. If None, then there 530 is no limit on the wait time. 531 chunksize: The size of the chunks the iterable will be broken into 532 before being passed to a child process. This argument is only 533 used by ProcessPoolExecutor; it is ignored by 534 ThreadPoolExecutor. 535 536 Returns: 537 An iterator equivalent to: map(func, *iterables) but the calls may 538 be evaluated out-of-order. 539 540 Raises: 541 TimeoutError: If the entire result iterator could not be generated 542 before the given timeout. 543 Exception: If fn(*args) raises for any values. 544 """ 545 if timeout is not None: 546 end_time = timeout + time.time() 547 548 fs = [self.submit(fn, *args) for args in zip(*iterables)] 549 550 # Yield must be hidden in closure so that the futures are submitted 551 # before the first iterator value is required. 552 def result_iterator(): 553 try: 554 for future in fs: 555 if timeout is None: 556 yield future.result() 557 else: 558 yield future.result(end_time - time.time()) 559 finally: 560 for future in fs: 561 future.cancel() 562 return result_iterator() 563 564 def shutdown(self, wait=True): 565 """Clean-up the resources associated with the Executor. 566 567 It is safe to call this method several times. Otherwise, no other 568 methods can be called after this one. 569 570 Args: 571 wait: If True then shutdown will not return until all running 572 futures have finished executing and the resources used by the 573 executor have been reclaimed. 574 """ 575 pass 576 577 def __enter__(self): 578 return self 579 580 def __exit__(self, exc_type, exc_val, exc_tb): 581 self.shutdown(wait=True) 582 return False 583