1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4"""Implements ThreadPoolExecutor.""" 5 6import atexit 7from concurrent.futures import _base 8import itertools 9import Queue as queue 10import threading 11import weakref 12import sys 13 14try: 15 from multiprocessing import cpu_count 16except ImportError: 17 # some platforms don't have multiprocessing 18 def cpu_count(): 19 return None 20 21__author__ = 'Brian Quinlan (brian@sweetapp.com)' 22 23# Workers are created as daemon threads. This is done to allow the interpreter 24# to exit when there are still idle threads in a ThreadPoolExecutor's thread 25# pool (i.e. shutdown() was not called). However, allowing workers to die with 26# the interpreter has two undesirable properties: 27# - The workers would still be running during interpretor shutdown, 28# meaning that they would fail in unpredictable ways. 29# - The workers could be killed while evaluating a work item, which could 30# be bad if the callable being evaluated has external side-effects e.g. 31# writing to a file. 32# 33# To work around this problem, an exit handler is installed which tells the 34# workers to exit when their work queues are empty and then waits until the 35# threads finish. 36 37_threads_queues = weakref.WeakKeyDictionary() 38_shutdown = False 39 40def _python_exit(): 41 global _shutdown 42 _shutdown = True 43 items = list(_threads_queues.items()) if _threads_queues else () 44 for t, q in items: 45 q.put(None) 46 for t, q in items: 47 t.join(sys.maxint) 48 49atexit.register(_python_exit) 50 51class _WorkItem(object): 52 def __init__(self, future, fn, args, kwargs): 53 self.future = future 54 self.fn = fn 55 self.args = args 56 self.kwargs = kwargs 57 58 def run(self): 59 if not self.future.set_running_or_notify_cancel(): 60 return 61 62 try: 63 result = self.fn(*self.args, **self.kwargs) 64 except: 65 e, tb = sys.exc_info()[1:] 66 self.future.set_exception_info(e, tb) 67 else: 68 self.future.set_result(result) 69 70def _worker(executor_reference, work_queue): 71 try: 72 while True: 73 work_item = work_queue.get(block=True) 74 if work_item is not None: 75 work_item.run() 76 # Delete references to object. See issue16284 77 del work_item 78 continue 79 executor = executor_reference() 80 # Exit if: 81 # - The interpreter is shutting down OR 82 # - The executor that owns the worker has been collected OR 83 # - The executor that owns the worker has been shutdown. 84 if _shutdown or executor is None or executor._shutdown: 85 # Notice other workers 86 work_queue.put(None) 87 return 88 del executor 89 except: 90 _base.LOGGER.critical('Exception in worker', exc_info=True) 91 92 93class ThreadPoolExecutor(_base.Executor): 94 95 # Used to assign unique thread names when thread_name_prefix is not supplied. 96 _counter = itertools.count().next 97 98 def __init__(self, max_workers=None, thread_name_prefix=''): 99 """Initializes a new ThreadPoolExecutor instance. 100 101 Args: 102 max_workers: The maximum number of threads that can be used to 103 execute the given calls. 104 thread_name_prefix: An optional name prefix to give our threads. 105 """ 106 if max_workers is None: 107 # Use this number because ThreadPoolExecutor is often 108 # used to overlap I/O instead of CPU work. 109 max_workers = (cpu_count() or 1) * 5 110 if max_workers <= 0: 111 raise ValueError("max_workers must be greater than 0") 112 113 self._max_workers = max_workers 114 self._work_queue = queue.Queue() 115 self._threads = set() 116 self._shutdown = False 117 self._shutdown_lock = threading.Lock() 118 self._thread_name_prefix = (thread_name_prefix or 119 ("ThreadPoolExecutor-%d" % self._counter())) 120 121 def submit(self, fn, *args, **kwargs): 122 with self._shutdown_lock: 123 if self._shutdown: 124 raise RuntimeError('cannot schedule new futures after shutdown') 125 126 f = _base.Future() 127 w = _WorkItem(f, fn, args, kwargs) 128 129 self._work_queue.put(w) 130 self._adjust_thread_count() 131 return f 132 submit.__doc__ = _base.Executor.submit.__doc__ 133 134 def _adjust_thread_count(self): 135 # When the executor gets lost, the weakref callback will wake up 136 # the worker threads. 137 def weakref_cb(_, q=self._work_queue): 138 q.put(None) 139 # TODO(bquinlan): Should avoid creating new threads if there are more 140 # idle threads than items in the work queue. 141 num_threads = len(self._threads) 142 if num_threads < self._max_workers: 143 thread_name = '%s_%d' % (self._thread_name_prefix or self, 144 num_threads) 145 t = threading.Thread(name=thread_name, target=_worker, 146 args=(weakref.ref(self, weakref_cb), 147 self._work_queue)) 148 t.daemon = True 149 t.start() 150 self._threads.add(t) 151 _threads_queues[t] = self._work_queue 152 153 def shutdown(self, wait=True): 154 with self._shutdown_lock: 155 self._shutdown = True 156 self._work_queue.put(None) 157 if wait: 158 for t in self._threads: 159 t.join(sys.maxint) 160 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 161