• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
7
8from test.support.script_helper import assert_python_ok
9
10import contextlib
11import itertools
12import logging
13from logging.handlers import QueueHandler
14import os
15import queue
16import sys
17import threading
18import time
19import unittest
20import weakref
21from pickle import PicklingError
22
23from concurrent import futures
24from concurrent.futures._base import (
25    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
26    BrokenExecutor)
27from concurrent.futures.process import BrokenProcessPool
28from multiprocessing import get_context
29
30import multiprocessing.process
31import multiprocessing.util
32
33
34def create_future(state=PENDING, exception=None, result=None):
35    f = Future()
36    f._state = state
37    f._exception = exception
38    f._result = result
39    return f
40
41
42PENDING_FUTURE = create_future(state=PENDING)
43RUNNING_FUTURE = create_future(state=RUNNING)
44CANCELLED_FUTURE = create_future(state=CANCELLED)
45CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
46EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
47SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
48
49INITIALIZER_STATUS = 'uninitialized'
50
51
52def mul(x, y):
53    return x * y
54
55def capture(*args, **kwargs):
56    return args, kwargs
57
58def sleep_and_raise(t):
59    time.sleep(t)
60    raise Exception('this is an exception')
61
62def sleep_and_print(t, msg):
63    time.sleep(t)
64    print(msg)
65    sys.stdout.flush()
66
67def init(x):
68    global INITIALIZER_STATUS
69    INITIALIZER_STATUS = x
70
71def get_init_status():
72    return INITIALIZER_STATUS
73
74def init_fail(log_queue=None):
75    if log_queue is not None:
76        logger = logging.getLogger('concurrent.futures')
77        logger.addHandler(QueueHandler(log_queue))
78        logger.setLevel('CRITICAL')
79        logger.propagate = False
80    time.sleep(0.1)  # let some futures be scheduled
81    raise ValueError('error in initializer')
82
83
84class MyObject(object):
85    def my_method(self):
86        pass
87
88
89class EventfulGCObj():
90    def __init__(self, ctx):
91        mgr = get_context(ctx).Manager()
92        self.event = mgr.Event()
93
94    def __del__(self):
95        self.event.set()
96
97
98def make_dummy_object(_):
99    return MyObject()
100
101
102class BaseTestCase(unittest.TestCase):
103    def setUp(self):
104        self._thread_key = test.support.threading_setup()
105
106    def tearDown(self):
107        test.support.reap_children()
108        test.support.threading_cleanup(*self._thread_key)
109
110
111class ExecutorMixin:
112    worker_count = 5
113    executor_kwargs = {}
114
115    def setUp(self):
116        super().setUp()
117
118        self.t1 = time.monotonic()
119        if hasattr(self, "ctx"):
120            self.executor = self.executor_type(
121                max_workers=self.worker_count,
122                mp_context=self.get_context(),
123                **self.executor_kwargs)
124        else:
125            self.executor = self.executor_type(
126                max_workers=self.worker_count,
127                **self.executor_kwargs)
128        self._prime_executor()
129
130    def tearDown(self):
131        self.executor.shutdown(wait=True)
132        self.executor = None
133
134        dt = time.monotonic() - self.t1
135        if test.support.verbose:
136            print("%.2fs" % dt, end=' ')
137        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
138
139        super().tearDown()
140
141    def get_context(self):
142        return get_context(self.ctx)
143
144    def _prime_executor(self):
145        # Make sure that the executor is ready to do work before running the
146        # tests. This should reduce the probability of timeouts in the tests.
147        futures = [self.executor.submit(time.sleep, 0.1)
148                   for _ in range(self.worker_count)]
149        for f in futures:
150            f.result()
151
152
153class ThreadPoolMixin(ExecutorMixin):
154    executor_type = futures.ThreadPoolExecutor
155
156
157class ProcessPoolForkMixin(ExecutorMixin):
158    executor_type = futures.ProcessPoolExecutor
159    ctx = "fork"
160
161    def get_context(self):
162        if sys.platform == "win32":
163            self.skipTest("require unix system")
164        return super().get_context()
165
166
167class ProcessPoolSpawnMixin(ExecutorMixin):
168    executor_type = futures.ProcessPoolExecutor
169    ctx = "spawn"
170
171
172class ProcessPoolForkserverMixin(ExecutorMixin):
173    executor_type = futures.ProcessPoolExecutor
174    ctx = "forkserver"
175
176    def get_context(self):
177        if sys.platform == "win32":
178            self.skipTest("require unix system")
179        return super().get_context()
180
181
182def create_executor_tests(mixin, bases=(BaseTestCase,),
183                          executor_mixins=(ThreadPoolMixin,
184                                           ProcessPoolForkMixin,
185                                           ProcessPoolForkserverMixin,
186                                           ProcessPoolSpawnMixin)):
187    def strip_mixin(name):
188        if name.endswith(('Mixin', 'Tests')):
189            return name[:-5]
190        elif name.endswith('Test'):
191            return name[:-4]
192        else:
193            return name
194
195    for exe in executor_mixins:
196        name = ("%s%sTest"
197                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
198        cls = type(name, (mixin,) + (exe,) + bases, {})
199        globals()[name] = cls
200
201
202class InitializerMixin(ExecutorMixin):
203    worker_count = 2
204
205    def setUp(self):
206        global INITIALIZER_STATUS
207        INITIALIZER_STATUS = 'uninitialized'
208        self.executor_kwargs = dict(initializer=init,
209                                    initargs=('initialized',))
210        super().setUp()
211
212    def test_initializer(self):
213        futures = [self.executor.submit(get_init_status)
214                   for _ in range(self.worker_count)]
215
216        for f in futures:
217            self.assertEqual(f.result(), 'initialized')
218
219
220class FailingInitializerMixin(ExecutorMixin):
221    worker_count = 2
222
223    def setUp(self):
224        if hasattr(self, "ctx"):
225            # Pass a queue to redirect the child's logging output
226            self.mp_context = self.get_context()
227            self.log_queue = self.mp_context.Queue()
228            self.executor_kwargs = dict(initializer=init_fail,
229                                        initargs=(self.log_queue,))
230        else:
231            # In a thread pool, the child shares our logging setup
232            # (see _assert_logged())
233            self.mp_context = None
234            self.log_queue = None
235            self.executor_kwargs = dict(initializer=init_fail)
236        super().setUp()
237
238    def test_initializer(self):
239        with self._assert_logged('ValueError: error in initializer'):
240            try:
241                future = self.executor.submit(get_init_status)
242            except BrokenExecutor:
243                # Perhaps the executor is already broken
244                pass
245            else:
246                with self.assertRaises(BrokenExecutor):
247                    future.result()
248            # At some point, the executor should break
249            t1 = time.monotonic()
250            while not self.executor._broken:
251                if time.monotonic() - t1 > 5:
252                    self.fail("executor not broken after 5 s.")
253                time.sleep(0.01)
254            # ... and from this point submit() is guaranteed to fail
255            with self.assertRaises(BrokenExecutor):
256                self.executor.submit(get_init_status)
257
258    def _prime_executor(self):
259        pass
260
261    @contextlib.contextmanager
262    def _assert_logged(self, msg):
263        if self.log_queue is not None:
264            yield
265            output = []
266            try:
267                while True:
268                    output.append(self.log_queue.get_nowait().getMessage())
269            except queue.Empty:
270                pass
271        else:
272            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
273                yield
274            output = cm.output
275        self.assertTrue(any(msg in line for line in output),
276                        output)
277
278
279create_executor_tests(InitializerMixin)
280create_executor_tests(FailingInitializerMixin)
281
282
283class ExecutorShutdownTest:
284    def test_run_after_shutdown(self):
285        self.executor.shutdown()
286        self.assertRaises(RuntimeError,
287                          self.executor.submit,
288                          pow, 2, 5)
289
290    def test_interpreter_shutdown(self):
291        # Test the atexit hook for shutdown of worker threads and processes
292        rc, out, err = assert_python_ok('-c', """if 1:
293            from concurrent.futures import {executor_type}
294            from time import sleep
295            from test.test_concurrent_futures import sleep_and_print
296            if __name__ == "__main__":
297                context = '{context}'
298                if context == "":
299                    t = {executor_type}(5)
300                else:
301                    from multiprocessing import get_context
302                    context = get_context(context)
303                    t = {executor_type}(5, mp_context=context)
304                t.submit(sleep_and_print, 1.0, "apple")
305            """.format(executor_type=self.executor_type.__name__,
306                       context=getattr(self, "ctx", "")))
307        # Errors in atexit hooks don't change the process exit code, check
308        # stderr manually.
309        self.assertFalse(err)
310        self.assertEqual(out.strip(), b"apple")
311
312    def test_submit_after_interpreter_shutdown(self):
313        # Test the atexit hook for shutdown of worker threads and processes
314        rc, out, err = assert_python_ok('-c', """if 1:
315            import atexit
316            @atexit.register
317            def run_last():
318                try:
319                    t.submit(id, None)
320                except RuntimeError:
321                    print("runtime-error")
322                    raise
323            from concurrent.futures import {executor_type}
324            if __name__ == "__main__":
325                context = '{context}'
326                if not context:
327                    t = {executor_type}(5)
328                else:
329                    from multiprocessing import get_context
330                    context = get_context(context)
331                    t = {executor_type}(5, mp_context=context)
332                    t.submit(id, 42).result()
333            """.format(executor_type=self.executor_type.__name__,
334                       context=getattr(self, "ctx", "")))
335        # Errors in atexit hooks don't change the process exit code, check
336        # stderr manually.
337        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
338        self.assertEqual(out.strip(), b"runtime-error")
339
340    def test_hang_issue12364(self):
341        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
342        self.executor.shutdown()
343        for f in fs:
344            f.result()
345
346
347class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
348    def _prime_executor(self):
349        pass
350
351    def test_threads_terminate(self):
352        def acquire_lock(lock):
353            lock.acquire()
354
355        sem = threading.Semaphore(0)
356        for i in range(3):
357            self.executor.submit(acquire_lock, sem)
358        self.assertEqual(len(self.executor._threads), 3)
359        for i in range(3):
360            sem.release()
361        self.executor.shutdown()
362        for t in self.executor._threads:
363            t.join()
364
365    def test_context_manager_shutdown(self):
366        with futures.ThreadPoolExecutor(max_workers=5) as e:
367            executor = e
368            self.assertEqual(list(e.map(abs, range(-5, 5))),
369                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
370
371        for t in executor._threads:
372            t.join()
373
374    def test_del_shutdown(self):
375        executor = futures.ThreadPoolExecutor(max_workers=5)
376        executor.map(abs, range(-5, 5))
377        threads = executor._threads
378        del executor
379
380        for t in threads:
381            t.join()
382
383    def test_thread_names_assigned(self):
384        executor = futures.ThreadPoolExecutor(
385            max_workers=5, thread_name_prefix='SpecialPool')
386        executor.map(abs, range(-5, 5))
387        threads = executor._threads
388        del executor
389
390        for t in threads:
391            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
392            t.join()
393
394    def test_thread_names_default(self):
395        executor = futures.ThreadPoolExecutor(max_workers=5)
396        executor.map(abs, range(-5, 5))
397        threads = executor._threads
398        del executor
399
400        for t in threads:
401            # Ensure that our default name is reasonably sane and unique when
402            # no thread_name_prefix was supplied.
403            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
404            t.join()
405
406
407class ProcessPoolShutdownTest(ExecutorShutdownTest):
408    def _prime_executor(self):
409        pass
410
411    def test_processes_terminate(self):
412        self.executor.submit(mul, 21, 2)
413        self.executor.submit(mul, 6, 7)
414        self.executor.submit(mul, 3, 14)
415        self.assertEqual(len(self.executor._processes), 5)
416        processes = self.executor._processes
417        self.executor.shutdown()
418
419        for p in processes.values():
420            p.join()
421
422    def test_context_manager_shutdown(self):
423        with futures.ProcessPoolExecutor(max_workers=5) as e:
424            processes = e._processes
425            self.assertEqual(list(e.map(abs, range(-5, 5))),
426                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
427
428        for p in processes.values():
429            p.join()
430
431    def test_del_shutdown(self):
432        executor = futures.ProcessPoolExecutor(max_workers=5)
433        list(executor.map(abs, range(-5, 5)))
434        queue_management_thread = executor._queue_management_thread
435        processes = executor._processes
436        call_queue = executor._call_queue
437        queue_management_thread = executor._queue_management_thread
438        del executor
439
440        # Make sure that all the executor resources were properly cleaned by
441        # the shutdown process
442        queue_management_thread.join()
443        for p in processes.values():
444            p.join()
445        call_queue.join_thread()
446
447
448create_executor_tests(ProcessPoolShutdownTest,
449                      executor_mixins=(ProcessPoolForkMixin,
450                                       ProcessPoolForkserverMixin,
451                                       ProcessPoolSpawnMixin))
452
453
454class WaitTests:
455
456    def test_first_completed(self):
457        future1 = self.executor.submit(mul, 21, 2)
458        future2 = self.executor.submit(time.sleep, 1.5)
459
460        done, not_done = futures.wait(
461                [CANCELLED_FUTURE, future1, future2],
462                 return_when=futures.FIRST_COMPLETED)
463
464        self.assertEqual(set([future1]), done)
465        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
466
467    def test_first_completed_some_already_completed(self):
468        future1 = self.executor.submit(time.sleep, 1.5)
469
470        finished, pending = futures.wait(
471                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
472                 return_when=futures.FIRST_COMPLETED)
473
474        self.assertEqual(
475                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
476                finished)
477        self.assertEqual(set([future1]), pending)
478
479    def test_first_exception(self):
480        future1 = self.executor.submit(mul, 2, 21)
481        future2 = self.executor.submit(sleep_and_raise, 1.5)
482        future3 = self.executor.submit(time.sleep, 3)
483
484        finished, pending = futures.wait(
485                [future1, future2, future3],
486                return_when=futures.FIRST_EXCEPTION)
487
488        self.assertEqual(set([future1, future2]), finished)
489        self.assertEqual(set([future3]), pending)
490
491    def test_first_exception_some_already_complete(self):
492        future1 = self.executor.submit(divmod, 21, 0)
493        future2 = self.executor.submit(time.sleep, 1.5)
494
495        finished, pending = futures.wait(
496                [SUCCESSFUL_FUTURE,
497                 CANCELLED_FUTURE,
498                 CANCELLED_AND_NOTIFIED_FUTURE,
499                 future1, future2],
500                return_when=futures.FIRST_EXCEPTION)
501
502        self.assertEqual(set([SUCCESSFUL_FUTURE,
503                              CANCELLED_AND_NOTIFIED_FUTURE,
504                              future1]), finished)
505        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
506
507    def test_first_exception_one_already_failed(self):
508        future1 = self.executor.submit(time.sleep, 2)
509
510        finished, pending = futures.wait(
511                 [EXCEPTION_FUTURE, future1],
512                 return_when=futures.FIRST_EXCEPTION)
513
514        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
515        self.assertEqual(set([future1]), pending)
516
517    def test_all_completed(self):
518        future1 = self.executor.submit(divmod, 2, 0)
519        future2 = self.executor.submit(mul, 2, 21)
520
521        finished, pending = futures.wait(
522                [SUCCESSFUL_FUTURE,
523                 CANCELLED_AND_NOTIFIED_FUTURE,
524                 EXCEPTION_FUTURE,
525                 future1,
526                 future2],
527                return_when=futures.ALL_COMPLETED)
528
529        self.assertEqual(set([SUCCESSFUL_FUTURE,
530                              CANCELLED_AND_NOTIFIED_FUTURE,
531                              EXCEPTION_FUTURE,
532                              future1,
533                              future2]), finished)
534        self.assertEqual(set(), pending)
535
536    def test_timeout(self):
537        future1 = self.executor.submit(mul, 6, 7)
538        future2 = self.executor.submit(time.sleep, 6)
539
540        finished, pending = futures.wait(
541                [CANCELLED_AND_NOTIFIED_FUTURE,
542                 EXCEPTION_FUTURE,
543                 SUCCESSFUL_FUTURE,
544                 future1, future2],
545                timeout=5,
546                return_when=futures.ALL_COMPLETED)
547
548        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
549                              EXCEPTION_FUTURE,
550                              SUCCESSFUL_FUTURE,
551                              future1]), finished)
552        self.assertEqual(set([future2]), pending)
553
554
555class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
556
557    def test_pending_calls_race(self):
558        # Issue #14406: multi-threaded race condition when waiting on all
559        # futures.
560        event = threading.Event()
561        def future_func():
562            event.wait()
563        oldswitchinterval = sys.getswitchinterval()
564        sys.setswitchinterval(1e-6)
565        try:
566            fs = {self.executor.submit(future_func) for i in range(100)}
567            event.set()
568            futures.wait(fs, return_when=futures.ALL_COMPLETED)
569        finally:
570            sys.setswitchinterval(oldswitchinterval)
571
572
573create_executor_tests(WaitTests,
574                      executor_mixins=(ProcessPoolForkMixin,
575                                       ProcessPoolForkserverMixin,
576                                       ProcessPoolSpawnMixin))
577
578
579class AsCompletedTests:
580    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
581    def test_no_timeout(self):
582        future1 = self.executor.submit(mul, 2, 21)
583        future2 = self.executor.submit(mul, 7, 6)
584
585        completed = set(futures.as_completed(
586                [CANCELLED_AND_NOTIFIED_FUTURE,
587                 EXCEPTION_FUTURE,
588                 SUCCESSFUL_FUTURE,
589                 future1, future2]))
590        self.assertEqual(set(
591                [CANCELLED_AND_NOTIFIED_FUTURE,
592                 EXCEPTION_FUTURE,
593                 SUCCESSFUL_FUTURE,
594                 future1, future2]),
595                completed)
596
597    def test_zero_timeout(self):
598        future1 = self.executor.submit(time.sleep, 2)
599        completed_futures = set()
600        try:
601            for future in futures.as_completed(
602                    [CANCELLED_AND_NOTIFIED_FUTURE,
603                     EXCEPTION_FUTURE,
604                     SUCCESSFUL_FUTURE,
605                     future1],
606                    timeout=0):
607                completed_futures.add(future)
608        except futures.TimeoutError:
609            pass
610
611        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
612                              EXCEPTION_FUTURE,
613                              SUCCESSFUL_FUTURE]),
614                         completed_futures)
615
616    def test_duplicate_futures(self):
617        # Issue 20367. Duplicate futures should not raise exceptions or give
618        # duplicate responses.
619        # Issue #31641: accept arbitrary iterables.
620        future1 = self.executor.submit(time.sleep, 2)
621        completed = [
622            f for f in futures.as_completed(itertools.repeat(future1, 3))
623        ]
624        self.assertEqual(len(completed), 1)
625
626    def test_free_reference_yielded_future(self):
627        # Issue #14406: Generator should not keep references
628        # to finished futures.
629        futures_list = [Future() for _ in range(8)]
630        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
631        futures_list.append(create_future(state=FINISHED, result=42))
632
633        with self.assertRaises(futures.TimeoutError):
634            for future in futures.as_completed(futures_list, timeout=0):
635                futures_list.remove(future)
636                wr = weakref.ref(future)
637                del future
638                self.assertIsNone(wr())
639
640        futures_list[0].set_result("test")
641        for future in futures.as_completed(futures_list):
642            futures_list.remove(future)
643            wr = weakref.ref(future)
644            del future
645            self.assertIsNone(wr())
646            if futures_list:
647                futures_list[0].set_result("test")
648
649    def test_correct_timeout_exception_msg(self):
650        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
651                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
652
653        with self.assertRaises(futures.TimeoutError) as cm:
654            list(futures.as_completed(futures_list, timeout=0))
655
656        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
657
658
659create_executor_tests(AsCompletedTests)
660
661
662class ExecutorTest:
663    # Executor.shutdown() and context manager usage is tested by
664    # ExecutorShutdownTest.
665    def test_submit(self):
666        future = self.executor.submit(pow, 2, 8)
667        self.assertEqual(256, future.result())
668
669    def test_submit_keyword(self):
670        future = self.executor.submit(mul, 2, y=8)
671        self.assertEqual(16, future.result())
672        future = self.executor.submit(capture, 1, self=2, fn=3)
673        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
674        with self.assertWarns(DeprecationWarning):
675            future = self.executor.submit(fn=capture, arg=1)
676        self.assertEqual(future.result(), ((), {'arg': 1}))
677        with self.assertRaises(TypeError):
678            self.executor.submit(arg=1)
679
680    def test_map(self):
681        self.assertEqual(
682                list(self.executor.map(pow, range(10), range(10))),
683                list(map(pow, range(10), range(10))))
684
685        self.assertEqual(
686                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
687                list(map(pow, range(10), range(10))))
688
689    def test_map_exception(self):
690        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
691        self.assertEqual(i.__next__(), (0, 1))
692        self.assertEqual(i.__next__(), (0, 1))
693        self.assertRaises(ZeroDivisionError, i.__next__)
694
695    def test_map_timeout(self):
696        results = []
697        try:
698            for i in self.executor.map(time.sleep,
699                                       [0, 0, 6],
700                                       timeout=5):
701                results.append(i)
702        except futures.TimeoutError:
703            pass
704        else:
705            self.fail('expected TimeoutError')
706
707        self.assertEqual([None, None], results)
708
709    def test_shutdown_race_issue12456(self):
710        # Issue #12456: race condition at shutdown where trying to post a
711        # sentinel in the call queue blocks (the queue is full while processes
712        # have exited).
713        self.executor.map(str, [2] * (self.worker_count + 1))
714        self.executor.shutdown()
715
716    @test.support.cpython_only
717    def test_no_stale_references(self):
718        # Issue #16284: check that the executors don't unnecessarily hang onto
719        # references.
720        my_object = MyObject()
721        my_object_collected = threading.Event()
722        my_object_callback = weakref.ref(
723            my_object, lambda obj: my_object_collected.set())
724        # Deliberately discarding the future.
725        self.executor.submit(my_object.my_method)
726        del my_object
727
728        collected = my_object_collected.wait(timeout=5.0)
729        self.assertTrue(collected,
730                        "Stale reference not collected within timeout.")
731
732    def test_max_workers_negative(self):
733        for number in (0, -1):
734            with self.assertRaisesRegex(ValueError,
735                                        "max_workers must be greater "
736                                        "than 0"):
737                self.executor_type(max_workers=number)
738
739    def test_free_reference(self):
740        # Issue #14406: Result iterator should not keep an internal
741        # reference to result objects.
742        for obj in self.executor.map(make_dummy_object, range(10)):
743            wr = weakref.ref(obj)
744            del obj
745            self.assertIsNone(wr())
746
747
748class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
749    def test_map_submits_without_iteration(self):
750        """Tests verifying issue 11777."""
751        finished = []
752        def record_finished(n):
753            finished.append(n)
754
755        self.executor.map(record_finished, range(10))
756        self.executor.shutdown(wait=True)
757        self.assertCountEqual(finished, range(10))
758
759    def test_default_workers(self):
760        executor = self.executor_type()
761        expected = min(32, (os.cpu_count() or 1) + 4)
762        self.assertEqual(executor._max_workers, expected)
763
764    def test_saturation(self):
765        executor = self.executor_type(4)
766        def acquire_lock(lock):
767            lock.acquire()
768
769        sem = threading.Semaphore(0)
770        for i in range(15 * executor._max_workers):
771            executor.submit(acquire_lock, sem)
772        self.assertEqual(len(executor._threads), executor._max_workers)
773        for i in range(15 * executor._max_workers):
774            sem.release()
775        executor.shutdown(wait=True)
776
777    def test_idle_thread_reuse(self):
778        executor = self.executor_type()
779        executor.submit(mul, 21, 2).result()
780        executor.submit(mul, 6, 7).result()
781        executor.submit(mul, 3, 14).result()
782        self.assertEqual(len(executor._threads), 1)
783        executor.shutdown(wait=True)
784
785
786class ProcessPoolExecutorTest(ExecutorTest):
787
788    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
789    def test_max_workers_too_large(self):
790        with self.assertRaisesRegex(ValueError,
791                                    "max_workers must be <= 61"):
792            futures.ProcessPoolExecutor(max_workers=62)
793
794    def test_killed_child(self):
795        # When a child process is abruptly terminated, the whole pool gets
796        # "broken".
797        futures = [self.executor.submit(time.sleep, 3)]
798        # Get one of the processes, and terminate (kill) it
799        p = next(iter(self.executor._processes.values()))
800        p.terminate()
801        for fut in futures:
802            self.assertRaises(BrokenProcessPool, fut.result)
803        # Submitting other jobs fails as well.
804        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
805
806    def test_map_chunksize(self):
807        def bad_map():
808            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
809
810        ref = list(map(pow, range(40), range(40)))
811        self.assertEqual(
812            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
813            ref)
814        self.assertEqual(
815            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
816            ref)
817        self.assertEqual(
818            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
819            ref)
820        self.assertRaises(ValueError, bad_map)
821
822    @classmethod
823    def _test_traceback(cls):
824        raise RuntimeError(123) # some comment
825
826    def test_traceback(self):
827        # We want ensure that the traceback from the child process is
828        # contained in the traceback raised in the main process.
829        future = self.executor.submit(self._test_traceback)
830        with self.assertRaises(Exception) as cm:
831            future.result()
832
833        exc = cm.exception
834        self.assertIs(type(exc), RuntimeError)
835        self.assertEqual(exc.args, (123,))
836        cause = exc.__cause__
837        self.assertIs(type(cause), futures.process._RemoteTraceback)
838        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
839
840        with test.support.captured_stderr() as f1:
841            try:
842                raise exc
843            except RuntimeError:
844                sys.excepthook(*sys.exc_info())
845        self.assertIn('raise RuntimeError(123) # some comment',
846                      f1.getvalue())
847
848    def test_ressources_gced_in_workers(self):
849        # Ensure that argument for a job are correctly gc-ed after the job
850        # is finished
851        obj = EventfulGCObj(self.ctx)
852        future = self.executor.submit(id, obj)
853        future.result()
854
855        self.assertTrue(obj.event.wait(timeout=1))
856
857
858create_executor_tests(ProcessPoolExecutorTest,
859                      executor_mixins=(ProcessPoolForkMixin,
860                                       ProcessPoolForkserverMixin,
861                                       ProcessPoolSpawnMixin))
862
863def hide_process_stderr():
864    import io
865    sys.stderr = io.StringIO()
866
867
868def _crash(delay=None):
869    """Induces a segfault."""
870    if delay:
871        time.sleep(delay)
872    import faulthandler
873    faulthandler.disable()
874    faulthandler._sigsegv()
875
876
877def _exit():
878    """Induces a sys exit with exitcode 1."""
879    sys.exit(1)
880
881
882def _raise_error(Err):
883    """Function that raises an Exception in process."""
884    hide_process_stderr()
885    raise Err()
886
887
888def _return_instance(cls):
889    """Function that returns a instance of cls."""
890    hide_process_stderr()
891    return cls()
892
893
894class CrashAtPickle(object):
895    """Bad object that triggers a segfault at pickling time."""
896    def __reduce__(self):
897        _crash()
898
899
900class CrashAtUnpickle(object):
901    """Bad object that triggers a segfault at unpickling time."""
902    def __reduce__(self):
903        return _crash, ()
904
905
906class ExitAtPickle(object):
907    """Bad object that triggers a process exit at pickling time."""
908    def __reduce__(self):
909        _exit()
910
911
912class ExitAtUnpickle(object):
913    """Bad object that triggers a process exit at unpickling time."""
914    def __reduce__(self):
915        return _exit, ()
916
917
918class ErrorAtPickle(object):
919    """Bad object that triggers an error at pickling time."""
920    def __reduce__(self):
921        from pickle import PicklingError
922        raise PicklingError("Error in pickle")
923
924
925class ErrorAtUnpickle(object):
926    """Bad object that triggers an error at unpickling time."""
927    def __reduce__(self):
928        from pickle import UnpicklingError
929        return _raise_error, (UnpicklingError, )
930
931
932class ExecutorDeadlockTest:
933    TIMEOUT = 15
934
935    @classmethod
936    def _sleep_id(cls, x, delay):
937        time.sleep(delay)
938        return x
939
940    def _fail_on_deadlock(self, executor):
941        # If we did not recover before TIMEOUT seconds, consider that the
942        # executor is in a deadlock state and forcefully clean all its
943        # composants.
944        import faulthandler
945        from tempfile import TemporaryFile
946        with TemporaryFile(mode="w+") as f:
947            faulthandler.dump_traceback(file=f)
948            f.seek(0)
949            tb = f.read()
950        for p in executor._processes.values():
951            p.terminate()
952        # This should be safe to call executor.shutdown here as all possible
953        # deadlocks should have been broken.
954        executor.shutdown(wait=True)
955        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
956        self.fail(f"Executor deadlock:\n\n{tb}")
957
958
959    def test_crash(self):
960        # extensive testing for deadlock caused by crashes in a pool.
961        self.executor.shutdown(wait=True)
962        crash_cases = [
963            # Check problem occurring while pickling a task in
964            # the task_handler thread
965            (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
966            # Check problem occurring while unpickling a task on workers
967            (id, (ExitAtUnpickle(),), BrokenProcessPool,
968             "exit at task unpickle"),
969            (id, (ErrorAtUnpickle(),), BrokenProcessPool,
970             "error at task unpickle"),
971            (id, (CrashAtUnpickle(),), BrokenProcessPool,
972             "crash at task unpickle"),
973            # Check problem occurring during func execution on workers
974            (_crash, (), BrokenProcessPool,
975             "crash during func execution on worker"),
976            (_exit, (), SystemExit,
977             "exit during func execution on worker"),
978            (_raise_error, (RuntimeError, ), RuntimeError,
979             "error during func execution on worker"),
980            # Check problem occurring while pickling a task result
981            # on workers
982            (_return_instance, (CrashAtPickle,), BrokenProcessPool,
983             "crash during result pickle on worker"),
984            (_return_instance, (ExitAtPickle,), SystemExit,
985             "exit during result pickle on worker"),
986            (_return_instance, (ErrorAtPickle,), PicklingError,
987             "error during result pickle on worker"),
988            # Check problem occurring while unpickling a task in
989            # the result_handler thread
990            (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
991             "error during result unpickle in result_handler"),
992            (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
993             "exit during result unpickle in result_handler")
994        ]
995        for func, args, error, name in crash_cases:
996            with self.subTest(name):
997                # The captured_stderr reduces the noise in the test report
998                with test.support.captured_stderr():
999                    executor = self.executor_type(
1000                        max_workers=2, mp_context=get_context(self.ctx))
1001                    res = executor.submit(func, *args)
1002                    with self.assertRaises(error):
1003                        try:
1004                            res.result(timeout=self.TIMEOUT)
1005                        except futures.TimeoutError:
1006                            # If we did not recover before TIMEOUT seconds,
1007                            # consider that the executor is in a deadlock state
1008                            self._fail_on_deadlock(executor)
1009                    executor.shutdown(wait=True)
1010
1011    def test_shutdown_deadlock(self):
1012        # Test that the pool calling shutdown do not cause deadlock
1013        # if a worker fails after the shutdown call.
1014        self.executor.shutdown(wait=True)
1015        with self.executor_type(max_workers=2,
1016                                mp_context=get_context(self.ctx)) as executor:
1017            self.executor = executor  # Allow clean up in fail_on_deadlock
1018            f = executor.submit(_crash, delay=.1)
1019            executor.shutdown(wait=True)
1020            with self.assertRaises(BrokenProcessPool):
1021                f.result()
1022
1023
1024create_executor_tests(ExecutorDeadlockTest,
1025                      executor_mixins=(ProcessPoolForkMixin,
1026                                       ProcessPoolForkserverMixin,
1027                                       ProcessPoolSpawnMixin))
1028
1029
1030class FutureTests(BaseTestCase):
1031    def test_done_callback_with_result(self):
1032        callback_result = None
1033        def fn(callback_future):
1034            nonlocal callback_result
1035            callback_result = callback_future.result()
1036
1037        f = Future()
1038        f.add_done_callback(fn)
1039        f.set_result(5)
1040        self.assertEqual(5, callback_result)
1041
1042    def test_done_callback_with_exception(self):
1043        callback_exception = None
1044        def fn(callback_future):
1045            nonlocal callback_exception
1046            callback_exception = callback_future.exception()
1047
1048        f = Future()
1049        f.add_done_callback(fn)
1050        f.set_exception(Exception('test'))
1051        self.assertEqual(('test',), callback_exception.args)
1052
1053    def test_done_callback_with_cancel(self):
1054        was_cancelled = None
1055        def fn(callback_future):
1056            nonlocal was_cancelled
1057            was_cancelled = callback_future.cancelled()
1058
1059        f = Future()
1060        f.add_done_callback(fn)
1061        self.assertTrue(f.cancel())
1062        self.assertTrue(was_cancelled)
1063
1064    def test_done_callback_raises(self):
1065        with test.support.captured_stderr() as stderr:
1066            raising_was_called = False
1067            fn_was_called = False
1068
1069            def raising_fn(callback_future):
1070                nonlocal raising_was_called
1071                raising_was_called = True
1072                raise Exception('doh!')
1073
1074            def fn(callback_future):
1075                nonlocal fn_was_called
1076                fn_was_called = True
1077
1078            f = Future()
1079            f.add_done_callback(raising_fn)
1080            f.add_done_callback(fn)
1081            f.set_result(5)
1082            self.assertTrue(raising_was_called)
1083            self.assertTrue(fn_was_called)
1084            self.assertIn('Exception: doh!', stderr.getvalue())
1085
1086    def test_done_callback_already_successful(self):
1087        callback_result = None
1088        def fn(callback_future):
1089            nonlocal callback_result
1090            callback_result = callback_future.result()
1091
1092        f = Future()
1093        f.set_result(5)
1094        f.add_done_callback(fn)
1095        self.assertEqual(5, callback_result)
1096
1097    def test_done_callback_already_failed(self):
1098        callback_exception = None
1099        def fn(callback_future):
1100            nonlocal callback_exception
1101            callback_exception = callback_future.exception()
1102
1103        f = Future()
1104        f.set_exception(Exception('test'))
1105        f.add_done_callback(fn)
1106        self.assertEqual(('test',), callback_exception.args)
1107
1108    def test_done_callback_already_cancelled(self):
1109        was_cancelled = None
1110        def fn(callback_future):
1111            nonlocal was_cancelled
1112            was_cancelled = callback_future.cancelled()
1113
1114        f = Future()
1115        self.assertTrue(f.cancel())
1116        f.add_done_callback(fn)
1117        self.assertTrue(was_cancelled)
1118
1119    def test_done_callback_raises_already_succeeded(self):
1120        with test.support.captured_stderr() as stderr:
1121            def raising_fn(callback_future):
1122                raise Exception('doh!')
1123
1124            f = Future()
1125
1126            # Set the result first to simulate a future that runs instantly,
1127            # effectively allowing the callback to be run immediately.
1128            f.set_result(5)
1129            f.add_done_callback(raising_fn)
1130
1131            self.assertIn('exception calling callback for', stderr.getvalue())
1132            self.assertIn('doh!', stderr.getvalue())
1133
1134
1135    def test_repr(self):
1136        self.assertRegex(repr(PENDING_FUTURE),
1137                         '<Future at 0x[0-9a-f]+ state=pending>')
1138        self.assertRegex(repr(RUNNING_FUTURE),
1139                         '<Future at 0x[0-9a-f]+ state=running>')
1140        self.assertRegex(repr(CANCELLED_FUTURE),
1141                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1142        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1143                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1144        self.assertRegex(
1145                repr(EXCEPTION_FUTURE),
1146                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
1147        self.assertRegex(
1148                repr(SUCCESSFUL_FUTURE),
1149                '<Future at 0x[0-9a-f]+ state=finished returned int>')
1150
1151
1152    def test_cancel(self):
1153        f1 = create_future(state=PENDING)
1154        f2 = create_future(state=RUNNING)
1155        f3 = create_future(state=CANCELLED)
1156        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
1157        f5 = create_future(state=FINISHED, exception=OSError())
1158        f6 = create_future(state=FINISHED, result=5)
1159
1160        self.assertTrue(f1.cancel())
1161        self.assertEqual(f1._state, CANCELLED)
1162
1163        self.assertFalse(f2.cancel())
1164        self.assertEqual(f2._state, RUNNING)
1165
1166        self.assertTrue(f3.cancel())
1167        self.assertEqual(f3._state, CANCELLED)
1168
1169        self.assertTrue(f4.cancel())
1170        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1171
1172        self.assertFalse(f5.cancel())
1173        self.assertEqual(f5._state, FINISHED)
1174
1175        self.assertFalse(f6.cancel())
1176        self.assertEqual(f6._state, FINISHED)
1177
1178    def test_cancelled(self):
1179        self.assertFalse(PENDING_FUTURE.cancelled())
1180        self.assertFalse(RUNNING_FUTURE.cancelled())
1181        self.assertTrue(CANCELLED_FUTURE.cancelled())
1182        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1183        self.assertFalse(EXCEPTION_FUTURE.cancelled())
1184        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1185
1186    def test_done(self):
1187        self.assertFalse(PENDING_FUTURE.done())
1188        self.assertFalse(RUNNING_FUTURE.done())
1189        self.assertTrue(CANCELLED_FUTURE.done())
1190        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1191        self.assertTrue(EXCEPTION_FUTURE.done())
1192        self.assertTrue(SUCCESSFUL_FUTURE.done())
1193
1194    def test_running(self):
1195        self.assertFalse(PENDING_FUTURE.running())
1196        self.assertTrue(RUNNING_FUTURE.running())
1197        self.assertFalse(CANCELLED_FUTURE.running())
1198        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1199        self.assertFalse(EXCEPTION_FUTURE.running())
1200        self.assertFalse(SUCCESSFUL_FUTURE.running())
1201
1202    def test_result_with_timeout(self):
1203        self.assertRaises(futures.TimeoutError,
1204                          PENDING_FUTURE.result, timeout=0)
1205        self.assertRaises(futures.TimeoutError,
1206                          RUNNING_FUTURE.result, timeout=0)
1207        self.assertRaises(futures.CancelledError,
1208                          CANCELLED_FUTURE.result, timeout=0)
1209        self.assertRaises(futures.CancelledError,
1210                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
1211        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1212        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1213
1214    def test_result_with_success(self):
1215        # TODO(brian@sweetapp.com): This test is timing dependent.
1216        def notification():
1217            # Wait until the main thread is waiting for the result.
1218            time.sleep(1)
1219            f1.set_result(42)
1220
1221        f1 = create_future(state=PENDING)
1222        t = threading.Thread(target=notification)
1223        t.start()
1224
1225        self.assertEqual(f1.result(timeout=5), 42)
1226        t.join()
1227
1228    def test_result_with_cancel(self):
1229        # TODO(brian@sweetapp.com): This test is timing dependent.
1230        def notification():
1231            # Wait until the main thread is waiting for the result.
1232            time.sleep(1)
1233            f1.cancel()
1234
1235        f1 = create_future(state=PENDING)
1236        t = threading.Thread(target=notification)
1237        t.start()
1238
1239        self.assertRaises(futures.CancelledError, f1.result, timeout=5)
1240        t.join()
1241
1242    def test_exception_with_timeout(self):
1243        self.assertRaises(futures.TimeoutError,
1244                          PENDING_FUTURE.exception, timeout=0)
1245        self.assertRaises(futures.TimeoutError,
1246                          RUNNING_FUTURE.exception, timeout=0)
1247        self.assertRaises(futures.CancelledError,
1248                          CANCELLED_FUTURE.exception, timeout=0)
1249        self.assertRaises(futures.CancelledError,
1250                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1251        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1252                                   OSError))
1253        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1254
1255    def test_exception_with_success(self):
1256        def notification():
1257            # Wait until the main thread is waiting for the exception.
1258            time.sleep(1)
1259            with f1._condition:
1260                f1._state = FINISHED
1261                f1._exception = OSError()
1262                f1._condition.notify_all()
1263
1264        f1 = create_future(state=PENDING)
1265        t = threading.Thread(target=notification)
1266        t.start()
1267
1268        self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
1269        t.join()
1270
1271    def test_multiple_set_result(self):
1272        f = create_future(state=PENDING)
1273        f.set_result(1)
1274
1275        with self.assertRaisesRegex(
1276                futures.InvalidStateError,
1277                'FINISHED: <Future at 0x[0-9a-f]+ '
1278                'state=finished returned int>'
1279        ):
1280            f.set_result(2)
1281
1282        self.assertTrue(f.done())
1283        self.assertEqual(f.result(), 1)
1284
1285    def test_multiple_set_exception(self):
1286        f = create_future(state=PENDING)
1287        e = ValueError()
1288        f.set_exception(e)
1289
1290        with self.assertRaisesRegex(
1291                futures.InvalidStateError,
1292                'FINISHED: <Future at 0x[0-9a-f]+ '
1293                'state=finished raised ValueError>'
1294        ):
1295            f.set_exception(Exception())
1296
1297        self.assertEqual(f.exception(), e)
1298
1299
1300_threads_key = None
1301
1302def setUpModule():
1303    global _threads_key
1304    _threads_key = test.support.threading_setup()
1305
1306
1307def tearDownModule():
1308    test.support.threading_cleanup(*_threads_key)
1309    multiprocessing.util._cleanup_tests()
1310
1311
1312if __name__ == "__main__":
1313    unittest.main()
1314