• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ThreadPoolExecutor."""
5
6__author__ = 'Brian Quinlan (brian@sweetapp.com)'
7
8from concurrent.futures import _base
9import itertools
10import queue
11import threading
12import types
13import weakref
14import os
15
16
17_threads_queues = weakref.WeakKeyDictionary()
18_shutdown = False
19# Lock that ensures that new workers are not created while the interpreter is
20# shutting down. Must be held while mutating _threads_queues and _shutdown.
21_global_shutdown_lock = threading.Lock()
22
23def _python_exit():
24    global _shutdown
25    with _global_shutdown_lock:
26        _shutdown = True
27    items = list(_threads_queues.items())
28    for t, q in items:
29        q.put(None)
30    for t, q in items:
31        t.join()
32
33# Register for `_python_exit()` to be called just before joining all
34# non-daemon threads. This is used instead of `atexit.register()` for
35# compatibility with subinterpreters, which no longer support daemon threads.
36# See bpo-39812 for context.
37threading._register_atexit(_python_exit)
38
39# At fork, reinitialize the `_global_shutdown_lock` lock in the child process
40if hasattr(os, 'register_at_fork'):
41    os.register_at_fork(before=_global_shutdown_lock.acquire,
42                        after_in_child=_global_shutdown_lock._at_fork_reinit,
43                        after_in_parent=_global_shutdown_lock.release)
44    os.register_at_fork(after_in_child=_threads_queues.clear)
45
46
47class _WorkItem:
48    def __init__(self, future, fn, args, kwargs):
49        self.future = future
50        self.fn = fn
51        self.args = args
52        self.kwargs = kwargs
53
54    def run(self):
55        if not self.future.set_running_or_notify_cancel():
56            return
57
58        try:
59            result = self.fn(*self.args, **self.kwargs)
60        except BaseException as exc:
61            self.future.set_exception(exc)
62            # Break a reference cycle with the exception 'exc'
63            self = None
64        else:
65            self.future.set_result(result)
66
67    __class_getitem__ = classmethod(types.GenericAlias)
68
69
70def _worker(executor_reference, work_queue, initializer, initargs):
71    if initializer is not None:
72        try:
73            initializer(*initargs)
74        except BaseException:
75            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
76            executor = executor_reference()
77            if executor is not None:
78                executor._initializer_failed()
79            return
80    try:
81        while True:
82            try:
83                work_item = work_queue.get_nowait()
84            except queue.Empty:
85                # attempt to increment idle count if queue is empty
86                executor = executor_reference()
87                if executor is not None:
88                    executor._idle_semaphore.release()
89                del executor
90                work_item = work_queue.get(block=True)
91
92            if work_item is not None:
93                work_item.run()
94                # Delete references to object. See GH-60488
95                del work_item
96                continue
97
98            executor = executor_reference()
99            # Exit if:
100            #   - The interpreter is shutting down OR
101            #   - The executor that owns the worker has been collected OR
102            #   - The executor that owns the worker has been shutdown.
103            if _shutdown or executor is None or executor._shutdown:
104                # Flag the executor as shutting down as early as possible if it
105                # is not gc-ed yet.
106                if executor is not None:
107                    executor._shutdown = True
108                # Notice other workers
109                work_queue.put(None)
110                return
111            del executor
112    except BaseException:
113        _base.LOGGER.critical('Exception in worker', exc_info=True)
114
115
116class BrokenThreadPool(_base.BrokenExecutor):
117    """
118    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
119    """
120
121
122class ThreadPoolExecutor(_base.Executor):
123
124    # Used to assign unique thread names when thread_name_prefix is not supplied.
125    _counter = itertools.count().__next__
126
127    def __init__(self, max_workers=None, thread_name_prefix='',
128                 initializer=None, initargs=()):
129        """Initializes a new ThreadPoolExecutor instance.
130
131        Args:
132            max_workers: The maximum number of threads that can be used to
133                execute the given calls.
134            thread_name_prefix: An optional name prefix to give our threads.
135            initializer: A callable used to initialize worker threads.
136            initargs: A tuple of arguments to pass to the initializer.
137        """
138        if max_workers is None:
139            # ThreadPoolExecutor is often used to:
140            # * CPU bound task which releases GIL
141            # * I/O bound task (which releases GIL, of course)
142            #
143            # We use process_cpu_count + 4 for both types of tasks.
144            # But we limit it to 32 to avoid consuming surprisingly large resource
145            # on many core machine.
146            max_workers = min(32, (os.process_cpu_count() or 1) + 4)
147        if max_workers <= 0:
148            raise ValueError("max_workers must be greater than 0")
149
150        if initializer is not None and not callable(initializer):
151            raise TypeError("initializer must be a callable")
152
153        self._max_workers = max_workers
154        self._work_queue = queue.SimpleQueue()
155        self._idle_semaphore = threading.Semaphore(0)
156        self._threads = set()
157        self._broken = False
158        self._shutdown = False
159        self._shutdown_lock = threading.Lock()
160        self._thread_name_prefix = (thread_name_prefix or
161                                    ("ThreadPoolExecutor-%d" % self._counter()))
162        self._initializer = initializer
163        self._initargs = initargs
164
165    def submit(self, fn, /, *args, **kwargs):
166        with self._shutdown_lock, _global_shutdown_lock:
167            if self._broken:
168                raise BrokenThreadPool(self._broken)
169
170            if self._shutdown:
171                raise RuntimeError('cannot schedule new futures after shutdown')
172            if _shutdown:
173                raise RuntimeError('cannot schedule new futures after '
174                                   'interpreter shutdown')
175
176            f = _base.Future()
177            w = _WorkItem(f, fn, args, kwargs)
178
179            self._work_queue.put(w)
180            self._adjust_thread_count()
181            return f
182    submit.__doc__ = _base.Executor.submit.__doc__
183
184    def _adjust_thread_count(self):
185        # if idle threads are available, don't spin new threads
186        if self._idle_semaphore.acquire(timeout=0):
187            return
188
189        # When the executor gets lost, the weakref callback will wake up
190        # the worker threads.
191        def weakref_cb(_, q=self._work_queue):
192            q.put(None)
193
194        num_threads = len(self._threads)
195        if num_threads < self._max_workers:
196            thread_name = '%s_%d' % (self._thread_name_prefix or self,
197                                     num_threads)
198            t = threading.Thread(name=thread_name, target=_worker,
199                                 args=(weakref.ref(self, weakref_cb),
200                                       self._work_queue,
201                                       self._initializer,
202                                       self._initargs))
203            t.start()
204            self._threads.add(t)
205            _threads_queues[t] = self._work_queue
206
207    def _initializer_failed(self):
208        with self._shutdown_lock:
209            self._broken = ('A thread initializer failed, the thread pool '
210                            'is not usable anymore')
211            # Drain work queue and mark pending futures failed
212            while True:
213                try:
214                    work_item = self._work_queue.get_nowait()
215                except queue.Empty:
216                    break
217                if work_item is not None:
218                    work_item.future.set_exception(BrokenThreadPool(self._broken))
219
220    def shutdown(self, wait=True, *, cancel_futures=False):
221        with self._shutdown_lock:
222            self._shutdown = True
223            if cancel_futures:
224                # Drain all work items from the queue, and then cancel their
225                # associated futures.
226                while True:
227                    try:
228                        work_item = self._work_queue.get_nowait()
229                    except queue.Empty:
230                        break
231                    if work_item is not None:
232                        work_item.future.cancel()
233
234            # Send a wake-up to prevent threads calling
235            # _work_queue.get(block=True) from permanently blocking.
236            self._work_queue.put(None)
237        if wait:
238            for t in self._threads:
239                t.join()
240    shutdown.__doc__ = _base.Executor.shutdown.__doc__
241