1# 2# Module providing the `Pool` class for managing a process pool 3# 4# multiprocessing/pool.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = ['Pool', 'ThreadPool'] 11 12# 13# Imports 14# 15 16import threading 17import queue 18import itertools 19import collections 20import os 21import time 22import traceback 23 24# If threading is available then ThreadPool should be provided. Therefore 25# we avoid top-level imports which are liable to fail on some systems. 26from . import util 27from . import get_context, TimeoutError 28 29# 30# Constants representing the state of a pool 31# 32 33RUN = 0 34CLOSE = 1 35TERMINATE = 2 36 37# 38# Miscellaneous 39# 40 41job_counter = itertools.count() 42 43def mapstar(args): 44 return list(map(*args)) 45 46def starmapstar(args): 47 return list(itertools.starmap(args[0], args[1])) 48 49# 50# Hack to embed stringification of remote traceback in local traceback 51# 52 53class RemoteTraceback(Exception): 54 def __init__(self, tb): 55 self.tb = tb 56 def __str__(self): 57 return self.tb 58 59class ExceptionWithTraceback: 60 def __init__(self, exc, tb): 61 tb = traceback.format_exception(type(exc), exc, tb) 62 tb = ''.join(tb) 63 self.exc = exc 64 self.tb = '\n"""\n%s"""' % tb 65 def __reduce__(self): 66 return rebuild_exc, (self.exc, self.tb) 67 68def rebuild_exc(exc, tb): 69 exc.__cause__ = RemoteTraceback(tb) 70 return exc 71 72# 73# Code run by worker processes 74# 75 76class MaybeEncodingError(Exception): 77 """Wraps possible unpickleable errors, so they can be 78 safely sent through the socket.""" 79 80 def __init__(self, exc, value): 81 self.exc = repr(exc) 82 self.value = repr(value) 83 super(MaybeEncodingError, self).__init__(self.exc, self.value) 84 85 def __str__(self): 86 return "Error sending result: '%s'. Reason: '%s'" % (self.value, 87 self.exc) 88 89 def __repr__(self): 90 return "<%s: %s>" % (self.__class__.__name__, self) 91 92 93def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, 94 wrap_exception=False): 95 if (maxtasks is not None) and not (isinstance(maxtasks, int) 96 and maxtasks >= 1): 97 raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) 98 put = outqueue.put 99 get = inqueue.get 100 if hasattr(inqueue, '_writer'): 101 inqueue._writer.close() 102 outqueue._reader.close() 103 104 if initializer is not None: 105 initializer(*initargs) 106 107 completed = 0 108 while maxtasks is None or (maxtasks and completed < maxtasks): 109 try: 110 task = get() 111 except (EOFError, OSError): 112 util.debug('worker got EOFError or OSError -- exiting') 113 break 114 115 if task is None: 116 util.debug('worker got sentinel -- exiting') 117 break 118 119 job, i, func, args, kwds = task 120 try: 121 result = (True, func(*args, **kwds)) 122 except Exception as e: 123 if wrap_exception and func is not _helper_reraises_exception: 124 e = ExceptionWithTraceback(e, e.__traceback__) 125 result = (False, e) 126 try: 127 put((job, i, result)) 128 except Exception as e: 129 wrapped = MaybeEncodingError(e, result[1]) 130 util.debug("Possible encoding error while sending result: %s" % ( 131 wrapped)) 132 put((job, i, (False, wrapped))) 133 134 task = job = result = func = args = kwds = None 135 completed += 1 136 util.debug('worker exiting after %d tasks' % completed) 137 138def _helper_reraises_exception(ex): 139 'Pickle-able helper function for use by _guarded_task_generation.' 140 raise ex 141 142# 143# Class representing a process pool 144# 145 146class Pool(object): 147 ''' 148 Class which supports an async version of applying functions to arguments. 149 ''' 150 _wrap_exception = True 151 152 def Process(self, *args, **kwds): 153 return self._ctx.Process(*args, **kwds) 154 155 def __init__(self, processes=None, initializer=None, initargs=(), 156 maxtasksperchild=None, context=None): 157 self._ctx = context or get_context() 158 self._setup_queues() 159 self._taskqueue = queue.SimpleQueue() 160 self._cache = {} 161 self._state = RUN 162 self._maxtasksperchild = maxtasksperchild 163 self._initializer = initializer 164 self._initargs = initargs 165 166 if processes is None: 167 processes = os.cpu_count() or 1 168 if processes < 1: 169 raise ValueError("Number of processes must be at least 1") 170 171 if initializer is not None and not callable(initializer): 172 raise TypeError('initializer must be a callable') 173 174 self._processes = processes 175 self._pool = [] 176 self._repopulate_pool() 177 178 self._worker_handler = threading.Thread( 179 target=Pool._handle_workers, 180 args=(self, ) 181 ) 182 self._worker_handler.daemon = True 183 self._worker_handler._state = RUN 184 self._worker_handler.start() 185 186 187 self._task_handler = threading.Thread( 188 target=Pool._handle_tasks, 189 args=(self._taskqueue, self._quick_put, self._outqueue, 190 self._pool, self._cache) 191 ) 192 self._task_handler.daemon = True 193 self._task_handler._state = RUN 194 self._task_handler.start() 195 196 self._result_handler = threading.Thread( 197 target=Pool._handle_results, 198 args=(self._outqueue, self._quick_get, self._cache) 199 ) 200 self._result_handler.daemon = True 201 self._result_handler._state = RUN 202 self._result_handler.start() 203 204 self._terminate = util.Finalize( 205 self, self._terminate_pool, 206 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 207 self._worker_handler, self._task_handler, 208 self._result_handler, self._cache), 209 exitpriority=15 210 ) 211 212 def _join_exited_workers(self): 213 """Cleanup after any worker processes which have exited due to reaching 214 their specified lifetime. Returns True if any workers were cleaned up. 215 """ 216 cleaned = False 217 for i in reversed(range(len(self._pool))): 218 worker = self._pool[i] 219 if worker.exitcode is not None: 220 # worker exited 221 util.debug('cleaning up worker %d' % i) 222 worker.join() 223 cleaned = True 224 del self._pool[i] 225 return cleaned 226 227 def _repopulate_pool(self): 228 """Bring the number of pool processes up to the specified number, 229 for use after reaping workers which have exited. 230 """ 231 for i in range(self._processes - len(self._pool)): 232 w = self.Process(target=worker, 233 args=(self._inqueue, self._outqueue, 234 self._initializer, 235 self._initargs, self._maxtasksperchild, 236 self._wrap_exception) 237 ) 238 self._pool.append(w) 239 w.name = w.name.replace('Process', 'PoolWorker') 240 w.daemon = True 241 w.start() 242 util.debug('added worker') 243 244 def _maintain_pool(self): 245 """Clean up any exited workers and start replacements for them. 246 """ 247 if self._join_exited_workers(): 248 self._repopulate_pool() 249 250 def _setup_queues(self): 251 self._inqueue = self._ctx.SimpleQueue() 252 self._outqueue = self._ctx.SimpleQueue() 253 self._quick_put = self._inqueue._writer.send 254 self._quick_get = self._outqueue._reader.recv 255 256 def apply(self, func, args=(), kwds={}): 257 ''' 258 Equivalent of `func(*args, **kwds)`. 259 Pool must be running. 260 ''' 261 return self.apply_async(func, args, kwds).get() 262 263 def map(self, func, iterable, chunksize=None): 264 ''' 265 Apply `func` to each element in `iterable`, collecting the results 266 in a list that is returned. 267 ''' 268 return self._map_async(func, iterable, mapstar, chunksize).get() 269 270 def starmap(self, func, iterable, chunksize=None): 271 ''' 272 Like `map()` method but the elements of the `iterable` are expected to 273 be iterables as well and will be unpacked as arguments. Hence 274 `func` and (a, b) becomes func(a, b). 275 ''' 276 return self._map_async(func, iterable, starmapstar, chunksize).get() 277 278 def starmap_async(self, func, iterable, chunksize=None, callback=None, 279 error_callback=None): 280 ''' 281 Asynchronous version of `starmap()` method. 282 ''' 283 return self._map_async(func, iterable, starmapstar, chunksize, 284 callback, error_callback) 285 286 def _guarded_task_generation(self, result_job, func, iterable): 287 '''Provides a generator of tasks for imap and imap_unordered with 288 appropriate handling for iterables which throw exceptions during 289 iteration.''' 290 try: 291 i = -1 292 for i, x in enumerate(iterable): 293 yield (result_job, i, func, (x,), {}) 294 except Exception as e: 295 yield (result_job, i+1, _helper_reraises_exception, (e,), {}) 296 297 def imap(self, func, iterable, chunksize=1): 298 ''' 299 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. 300 ''' 301 if self._state != RUN: 302 raise ValueError("Pool not running") 303 if chunksize == 1: 304 result = IMapIterator(self._cache) 305 self._taskqueue.put( 306 ( 307 self._guarded_task_generation(result._job, func, iterable), 308 result._set_length 309 )) 310 return result 311 else: 312 if chunksize < 1: 313 raise ValueError( 314 "Chunksize must be 1+, not {0:n}".format( 315 chunksize)) 316 task_batches = Pool._get_tasks(func, iterable, chunksize) 317 result = IMapIterator(self._cache) 318 self._taskqueue.put( 319 ( 320 self._guarded_task_generation(result._job, 321 mapstar, 322 task_batches), 323 result._set_length 324 )) 325 return (item for chunk in result for item in chunk) 326 327 def imap_unordered(self, func, iterable, chunksize=1): 328 ''' 329 Like `imap()` method but ordering of results is arbitrary. 330 ''' 331 if self._state != RUN: 332 raise ValueError("Pool not running") 333 if chunksize == 1: 334 result = IMapUnorderedIterator(self._cache) 335 self._taskqueue.put( 336 ( 337 self._guarded_task_generation(result._job, func, iterable), 338 result._set_length 339 )) 340 return result 341 else: 342 if chunksize < 1: 343 raise ValueError( 344 "Chunksize must be 1+, not {0!r}".format(chunksize)) 345 task_batches = Pool._get_tasks(func, iterable, chunksize) 346 result = IMapUnorderedIterator(self._cache) 347 self._taskqueue.put( 348 ( 349 self._guarded_task_generation(result._job, 350 mapstar, 351 task_batches), 352 result._set_length 353 )) 354 return (item for chunk in result for item in chunk) 355 356 def apply_async(self, func, args=(), kwds={}, callback=None, 357 error_callback=None): 358 ''' 359 Asynchronous version of `apply()` method. 360 ''' 361 if self._state != RUN: 362 raise ValueError("Pool not running") 363 result = ApplyResult(self._cache, callback, error_callback) 364 self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) 365 return result 366 367 def map_async(self, func, iterable, chunksize=None, callback=None, 368 error_callback=None): 369 ''' 370 Asynchronous version of `map()` method. 371 ''' 372 return self._map_async(func, iterable, mapstar, chunksize, callback, 373 error_callback) 374 375 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, 376 error_callback=None): 377 ''' 378 Helper function to implement map, starmap and their async counterparts. 379 ''' 380 if self._state != RUN: 381 raise ValueError("Pool not running") 382 if not hasattr(iterable, '__len__'): 383 iterable = list(iterable) 384 385 if chunksize is None: 386 chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 387 if extra: 388 chunksize += 1 389 if len(iterable) == 0: 390 chunksize = 0 391 392 task_batches = Pool._get_tasks(func, iterable, chunksize) 393 result = MapResult(self._cache, chunksize, len(iterable), callback, 394 error_callback=error_callback) 395 self._taskqueue.put( 396 ( 397 self._guarded_task_generation(result._job, 398 mapper, 399 task_batches), 400 None 401 ) 402 ) 403 return result 404 405 @staticmethod 406 def _handle_workers(pool): 407 thread = threading.current_thread() 408 409 # Keep maintaining workers until the cache gets drained, unless the pool 410 # is terminated. 411 while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 412 pool._maintain_pool() 413 time.sleep(0.1) 414 # send sentinel to stop workers 415 pool._taskqueue.put(None) 416 util.debug('worker handler exiting') 417 418 @staticmethod 419 def _handle_tasks(taskqueue, put, outqueue, pool, cache): 420 thread = threading.current_thread() 421 422 for taskseq, set_length in iter(taskqueue.get, None): 423 task = None 424 try: 425 # iterating taskseq cannot fail 426 for task in taskseq: 427 if thread._state: 428 util.debug('task handler found thread._state != RUN') 429 break 430 try: 431 put(task) 432 except Exception as e: 433 job, idx = task[:2] 434 try: 435 cache[job]._set(idx, (False, e)) 436 except KeyError: 437 pass 438 else: 439 if set_length: 440 util.debug('doing set_length()') 441 idx = task[1] if task else -1 442 set_length(idx + 1) 443 continue 444 break 445 finally: 446 task = taskseq = job = None 447 else: 448 util.debug('task handler got sentinel') 449 450 try: 451 # tell result handler to finish when cache is empty 452 util.debug('task handler sending sentinel to result handler') 453 outqueue.put(None) 454 455 # tell workers there is no more work 456 util.debug('task handler sending sentinel to workers') 457 for p in pool: 458 put(None) 459 except OSError: 460 util.debug('task handler got OSError when sending sentinels') 461 462 util.debug('task handler exiting') 463 464 @staticmethod 465 def _handle_results(outqueue, get, cache): 466 thread = threading.current_thread() 467 468 while 1: 469 try: 470 task = get() 471 except (OSError, EOFError): 472 util.debug('result handler got EOFError/OSError -- exiting') 473 return 474 475 if thread._state: 476 assert thread._state == TERMINATE, "Thread not in TERMINATE" 477 util.debug('result handler found thread._state=TERMINATE') 478 break 479 480 if task is None: 481 util.debug('result handler got sentinel') 482 break 483 484 job, i, obj = task 485 try: 486 cache[job]._set(i, obj) 487 except KeyError: 488 pass 489 task = job = obj = None 490 491 while cache and thread._state != TERMINATE: 492 try: 493 task = get() 494 except (OSError, EOFError): 495 util.debug('result handler got EOFError/OSError -- exiting') 496 return 497 498 if task is None: 499 util.debug('result handler ignoring extra sentinel') 500 continue 501 job, i, obj = task 502 try: 503 cache[job]._set(i, obj) 504 except KeyError: 505 pass 506 task = job = obj = None 507 508 if hasattr(outqueue, '_reader'): 509 util.debug('ensuring that outqueue is not full') 510 # If we don't make room available in outqueue then 511 # attempts to add the sentinel (None) to outqueue may 512 # block. There is guaranteed to be no more than 2 sentinels. 513 try: 514 for i in range(10): 515 if not outqueue._reader.poll(): 516 break 517 get() 518 except (OSError, EOFError): 519 pass 520 521 util.debug('result handler exiting: len(cache)=%s, thread._state=%s', 522 len(cache), thread._state) 523 524 @staticmethod 525 def _get_tasks(func, it, size): 526 it = iter(it) 527 while 1: 528 x = tuple(itertools.islice(it, size)) 529 if not x: 530 return 531 yield (func, x) 532 533 def __reduce__(self): 534 raise NotImplementedError( 535 'pool objects cannot be passed between processes or pickled' 536 ) 537 538 def close(self): 539 util.debug('closing pool') 540 if self._state == RUN: 541 self._state = CLOSE 542 self._worker_handler._state = CLOSE 543 544 def terminate(self): 545 util.debug('terminating pool') 546 self._state = TERMINATE 547 self._worker_handler._state = TERMINATE 548 self._terminate() 549 550 def join(self): 551 util.debug('joining pool') 552 if self._state == RUN: 553 raise ValueError("Pool is still running") 554 elif self._state not in (CLOSE, TERMINATE): 555 raise ValueError("In unknown state") 556 self._worker_handler.join() 557 self._task_handler.join() 558 self._result_handler.join() 559 for p in self._pool: 560 p.join() 561 562 @staticmethod 563 def _help_stuff_finish(inqueue, task_handler, size): 564 # task_handler may be blocked trying to put items on inqueue 565 util.debug('removing tasks from inqueue until task handler finished') 566 inqueue._rlock.acquire() 567 while task_handler.is_alive() and inqueue._reader.poll(): 568 inqueue._reader.recv() 569 time.sleep(0) 570 571 @classmethod 572 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 573 worker_handler, task_handler, result_handler, cache): 574 # this is guaranteed to only be called once 575 util.debug('finalizing pool') 576 577 worker_handler._state = TERMINATE 578 task_handler._state = TERMINATE 579 580 util.debug('helping task handler/workers to finish') 581 cls._help_stuff_finish(inqueue, task_handler, len(pool)) 582 583 if (not result_handler.is_alive()) and (len(cache) != 0): 584 raise AssertionError( 585 "Cannot have cache with result_hander not alive") 586 587 result_handler._state = TERMINATE 588 outqueue.put(None) # sentinel 589 590 # We must wait for the worker handler to exit before terminating 591 # workers because we don't want workers to be restarted behind our back. 592 util.debug('joining worker handler') 593 if threading.current_thread() is not worker_handler: 594 worker_handler.join() 595 596 # Terminate workers which haven't already finished. 597 if pool and hasattr(pool[0], 'terminate'): 598 util.debug('terminating workers') 599 for p in pool: 600 if p.exitcode is None: 601 p.terminate() 602 603 util.debug('joining task handler') 604 if threading.current_thread() is not task_handler: 605 task_handler.join() 606 607 util.debug('joining result handler') 608 if threading.current_thread() is not result_handler: 609 result_handler.join() 610 611 if pool and hasattr(pool[0], 'terminate'): 612 util.debug('joining pool workers') 613 for p in pool: 614 if p.is_alive(): 615 # worker has not yet exited 616 util.debug('cleaning up worker %d' % p.pid) 617 p.join() 618 619 def __enter__(self): 620 return self 621 622 def __exit__(self, exc_type, exc_val, exc_tb): 623 self.terminate() 624 625# 626# Class whose instances are returned by `Pool.apply_async()` 627# 628 629class ApplyResult(object): 630 631 def __init__(self, cache, callback, error_callback): 632 self._event = threading.Event() 633 self._job = next(job_counter) 634 self._cache = cache 635 self._callback = callback 636 self._error_callback = error_callback 637 cache[self._job] = self 638 639 def ready(self): 640 return self._event.is_set() 641 642 def successful(self): 643 if not self.ready(): 644 raise ValueError("{0!r} not ready".format(self)) 645 return self._success 646 647 def wait(self, timeout=None): 648 self._event.wait(timeout) 649 650 def get(self, timeout=None): 651 self.wait(timeout) 652 if not self.ready(): 653 raise TimeoutError 654 if self._success: 655 return self._value 656 else: 657 raise self._value 658 659 def _set(self, i, obj): 660 self._success, self._value = obj 661 if self._callback and self._success: 662 self._callback(self._value) 663 if self._error_callback and not self._success: 664 self._error_callback(self._value) 665 self._event.set() 666 del self._cache[self._job] 667 668AsyncResult = ApplyResult # create alias -- see #17805 669 670# 671# Class whose instances are returned by `Pool.map_async()` 672# 673 674class MapResult(ApplyResult): 675 676 def __init__(self, cache, chunksize, length, callback, error_callback): 677 ApplyResult.__init__(self, cache, callback, 678 error_callback=error_callback) 679 self._success = True 680 self._value = [None] * length 681 self._chunksize = chunksize 682 if chunksize <= 0: 683 self._number_left = 0 684 self._event.set() 685 del cache[self._job] 686 else: 687 self._number_left = length//chunksize + bool(length % chunksize) 688 689 def _set(self, i, success_result): 690 self._number_left -= 1 691 success, result = success_result 692 if success and self._success: 693 self._value[i*self._chunksize:(i+1)*self._chunksize] = result 694 if self._number_left == 0: 695 if self._callback: 696 self._callback(self._value) 697 del self._cache[self._job] 698 self._event.set() 699 else: 700 if not success and self._success: 701 # only store first exception 702 self._success = False 703 self._value = result 704 if self._number_left == 0: 705 # only consider the result ready once all jobs are done 706 if self._error_callback: 707 self._error_callback(self._value) 708 del self._cache[self._job] 709 self._event.set() 710 711# 712# Class whose instances are returned by `Pool.imap()` 713# 714 715class IMapIterator(object): 716 717 def __init__(self, cache): 718 self._cond = threading.Condition(threading.Lock()) 719 self._job = next(job_counter) 720 self._cache = cache 721 self._items = collections.deque() 722 self._index = 0 723 self._length = None 724 self._unsorted = {} 725 cache[self._job] = self 726 727 def __iter__(self): 728 return self 729 730 def next(self, timeout=None): 731 with self._cond: 732 try: 733 item = self._items.popleft() 734 except IndexError: 735 if self._index == self._length: 736 raise StopIteration from None 737 self._cond.wait(timeout) 738 try: 739 item = self._items.popleft() 740 except IndexError: 741 if self._index == self._length: 742 raise StopIteration from None 743 raise TimeoutError from None 744 745 success, value = item 746 if success: 747 return value 748 raise value 749 750 __next__ = next # XXX 751 752 def _set(self, i, obj): 753 with self._cond: 754 if self._index == i: 755 self._items.append(obj) 756 self._index += 1 757 while self._index in self._unsorted: 758 obj = self._unsorted.pop(self._index) 759 self._items.append(obj) 760 self._index += 1 761 self._cond.notify() 762 else: 763 self._unsorted[i] = obj 764 765 if self._index == self._length: 766 del self._cache[self._job] 767 768 def _set_length(self, length): 769 with self._cond: 770 self._length = length 771 if self._index == self._length: 772 self._cond.notify() 773 del self._cache[self._job] 774 775# 776# Class whose instances are returned by `Pool.imap_unordered()` 777# 778 779class IMapUnorderedIterator(IMapIterator): 780 781 def _set(self, i, obj): 782 with self._cond: 783 self._items.append(obj) 784 self._index += 1 785 self._cond.notify() 786 if self._index == self._length: 787 del self._cache[self._job] 788 789# 790# 791# 792 793class ThreadPool(Pool): 794 _wrap_exception = False 795 796 @staticmethod 797 def Process(*args, **kwds): 798 from .dummy import Process 799 return Process(*args, **kwds) 800 801 def __init__(self, processes=None, initializer=None, initargs=()): 802 Pool.__init__(self, processes, initializer, initargs) 803 804 def _setup_queues(self): 805 self._inqueue = queue.SimpleQueue() 806 self._outqueue = queue.SimpleQueue() 807 self._quick_put = self._inqueue.put 808 self._quick_get = self._outqueue.get 809 810 @staticmethod 811 def _help_stuff_finish(inqueue, task_handler, size): 812 # drain inqueue, and put sentinels at its head to make workers finish 813 try: 814 while True: 815 inqueue.get(block=False) 816 except queue.Empty: 817 pass 818 for i in range(size): 819 inqueue.put(None) 820