• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ThreadPoolExecutor."""
5
6__author__ = 'Brian Quinlan (brian@sweetapp.com)'
7
8from concurrent.futures import _base
9import itertools
10import queue
11import threading
12import types
13import weakref
14import os
15
16
17_threads_queues = weakref.WeakKeyDictionary()
18_shutdown = False
19# Lock that ensures that new workers are not created while the interpreter is
20# shutting down. Must be held while mutating _threads_queues and _shutdown.
21_global_shutdown_lock = threading.Lock()
22
23def _python_exit():
24    global _shutdown
25    with _global_shutdown_lock:
26        _shutdown = True
27    items = list(_threads_queues.items())
28    for t, q in items:
29        q.put(None)
30    for t, q in items:
31        t.join()
32
33# Register for `_python_exit()` to be called just before joining all
34# non-daemon threads. This is used instead of `atexit.register()` for
35# compatibility with subinterpreters, which no longer support daemon threads.
36# See bpo-39812 for context.
37threading._register_atexit(_python_exit)
38
39# At fork, reinitialize the `_global_shutdown_lock` lock in the child process
40if hasattr(os, 'register_at_fork'):
41    os.register_at_fork(before=_global_shutdown_lock.acquire,
42                        after_in_child=_global_shutdown_lock._at_fork_reinit,
43                        after_in_parent=_global_shutdown_lock.release)
44
45
46class _WorkItem(object):
47    def __init__(self, future, fn, args, kwargs):
48        self.future = future
49        self.fn = fn
50        self.args = args
51        self.kwargs = kwargs
52
53    def run(self):
54        if not self.future.set_running_or_notify_cancel():
55            return
56
57        try:
58            result = self.fn(*self.args, **self.kwargs)
59        except BaseException as exc:
60            self.future.set_exception(exc)
61            # Break a reference cycle with the exception 'exc'
62            self = None
63        else:
64            self.future.set_result(result)
65
66    __class_getitem__ = classmethod(types.GenericAlias)
67
68
69def _worker(executor_reference, work_queue, initializer, initargs):
70    if initializer is not None:
71        try:
72            initializer(*initargs)
73        except BaseException:
74            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
75            executor = executor_reference()
76            if executor is not None:
77                executor._initializer_failed()
78            return
79    try:
80        while True:
81            work_item = work_queue.get(block=True)
82            if work_item is not None:
83                work_item.run()
84                # Delete references to object. See issue16284
85                del work_item
86
87                # attempt to increment idle count
88                executor = executor_reference()
89                if executor is not None:
90                    executor._idle_semaphore.release()
91                del executor
92                continue
93
94            executor = executor_reference()
95            # Exit if:
96            #   - The interpreter is shutting down OR
97            #   - The executor that owns the worker has been collected OR
98            #   - The executor that owns the worker has been shutdown.
99            if _shutdown or executor is None or executor._shutdown:
100                # Flag the executor as shutting down as early as possible if it
101                # is not gc-ed yet.
102                if executor is not None:
103                    executor._shutdown = True
104                # Notice other workers
105                work_queue.put(None)
106                return
107            del executor
108    except BaseException:
109        _base.LOGGER.critical('Exception in worker', exc_info=True)
110
111
112class BrokenThreadPool(_base.BrokenExecutor):
113    """
114    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
115    """
116
117
118class ThreadPoolExecutor(_base.Executor):
119
120    # Used to assign unique thread names when thread_name_prefix is not supplied.
121    _counter = itertools.count().__next__
122
123    def __init__(self, max_workers=None, thread_name_prefix='',
124                 initializer=None, initargs=()):
125        """Initializes a new ThreadPoolExecutor instance.
126
127        Args:
128            max_workers: The maximum number of threads that can be used to
129                execute the given calls.
130            thread_name_prefix: An optional name prefix to give our threads.
131            initializer: A callable used to initialize worker threads.
132            initargs: A tuple of arguments to pass to the initializer.
133        """
134        if max_workers is None:
135            # ThreadPoolExecutor is often used to:
136            # * CPU bound task which releases GIL
137            # * I/O bound task (which releases GIL, of course)
138            #
139            # We use cpu_count + 4 for both types of tasks.
140            # But we limit it to 32 to avoid consuming surprisingly large resource
141            # on many core machine.
142            max_workers = min(32, (os.cpu_count() or 1) + 4)
143        if max_workers <= 0:
144            raise ValueError("max_workers must be greater than 0")
145
146        if initializer is not None and not callable(initializer):
147            raise TypeError("initializer must be a callable")
148
149        self._max_workers = max_workers
150        self._work_queue = queue.SimpleQueue()
151        self._idle_semaphore = threading.Semaphore(0)
152        self._threads = set()
153        self._broken = False
154        self._shutdown = False
155        self._shutdown_lock = threading.Lock()
156        self._thread_name_prefix = (thread_name_prefix or
157                                    ("ThreadPoolExecutor-%d" % self._counter()))
158        self._initializer = initializer
159        self._initargs = initargs
160
161    def submit(self, fn, /, *args, **kwargs):
162        with self._shutdown_lock, _global_shutdown_lock:
163            if self._broken:
164                raise BrokenThreadPool(self._broken)
165
166            if self._shutdown:
167                raise RuntimeError('cannot schedule new futures after shutdown')
168            if _shutdown:
169                raise RuntimeError('cannot schedule new futures after '
170                                   'interpreter shutdown')
171
172            f = _base.Future()
173            w = _WorkItem(f, fn, args, kwargs)
174
175            self._work_queue.put(w)
176            self._adjust_thread_count()
177            return f
178    submit.__doc__ = _base.Executor.submit.__doc__
179
180    def _adjust_thread_count(self):
181        # if idle threads are available, don't spin new threads
182        if self._idle_semaphore.acquire(timeout=0):
183            return
184
185        # When the executor gets lost, the weakref callback will wake up
186        # the worker threads.
187        def weakref_cb(_, q=self._work_queue):
188            q.put(None)
189
190        num_threads = len(self._threads)
191        if num_threads < self._max_workers:
192            thread_name = '%s_%d' % (self._thread_name_prefix or self,
193                                     num_threads)
194            t = threading.Thread(name=thread_name, target=_worker,
195                                 args=(weakref.ref(self, weakref_cb),
196                                       self._work_queue,
197                                       self._initializer,
198                                       self._initargs))
199            t.start()
200            self._threads.add(t)
201            _threads_queues[t] = self._work_queue
202
203    def _initializer_failed(self):
204        with self._shutdown_lock:
205            self._broken = ('A thread initializer failed, the thread pool '
206                            'is not usable anymore')
207            # Drain work queue and mark pending futures failed
208            while True:
209                try:
210                    work_item = self._work_queue.get_nowait()
211                except queue.Empty:
212                    break
213                if work_item is not None:
214                    work_item.future.set_exception(BrokenThreadPool(self._broken))
215
216    def shutdown(self, wait=True, *, cancel_futures=False):
217        with self._shutdown_lock:
218            self._shutdown = True
219            if cancel_futures:
220                # Drain all work items from the queue, and then cancel their
221                # associated futures.
222                while True:
223                    try:
224                        work_item = self._work_queue.get_nowait()
225                    except queue.Empty:
226                        break
227                    if work_item is not None:
228                        work_item.future.cancel()
229
230            # Send a wake-up to prevent threads calling
231            # _work_queue.get(block=True) from permanently blocking.
232            self._work_queue.put(None)
233        if wait:
234            for t in self._threads:
235                t.join()
236    shutdown.__doc__ = _base.Executor.shutdown.__doc__
237