• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from test import support
2from test.support import import_helper
3from test.support import threading_helper
4
5# Skip tests if _multiprocessing wasn't built.
6import_helper.import_module('_multiprocessing')
7
8from test.support import hashlib_helper
9from test.support.script_helper import assert_python_ok
10
11import contextlib
12import itertools
13import logging
14from logging.handlers import QueueHandler
15import os
16import queue
17import sys
18import threading
19import time
20import unittest
21import weakref
22from pickle import PicklingError
23
24from concurrent import futures
25from concurrent.futures._base import (
26    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
27    BrokenExecutor)
28from concurrent.futures.process import BrokenProcessPool, _check_system_limits
29from multiprocessing import get_context
30
31import multiprocessing.process
32import multiprocessing.util
33
34
35if support.check_sanitizer(address=True, memory=True):
36    # bpo-46633: Skip the test because it is too slow when Python is built
37    # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
38    raise unittest.SkipTest("test too slow on ASAN/MSAN build")
39
40
41def create_future(state=PENDING, exception=None, result=None):
42    f = Future()
43    f._state = state
44    f._exception = exception
45    f._result = result
46    return f
47
48
49PENDING_FUTURE = create_future(state=PENDING)
50RUNNING_FUTURE = create_future(state=RUNNING)
51CANCELLED_FUTURE = create_future(state=CANCELLED)
52CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
53EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
54SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
55
56INITIALIZER_STATUS = 'uninitialized'
57
58
59def mul(x, y):
60    return x * y
61
62def capture(*args, **kwargs):
63    return args, kwargs
64
65def sleep_and_raise(t):
66    time.sleep(t)
67    raise Exception('this is an exception')
68
69def sleep_and_print(t, msg):
70    time.sleep(t)
71    print(msg)
72    sys.stdout.flush()
73
74def init(x):
75    global INITIALIZER_STATUS
76    INITIALIZER_STATUS = x
77
78def get_init_status():
79    return INITIALIZER_STATUS
80
81def init_fail(log_queue=None):
82    if log_queue is not None:
83        logger = logging.getLogger('concurrent.futures')
84        logger.addHandler(QueueHandler(log_queue))
85        logger.setLevel('CRITICAL')
86        logger.propagate = False
87    time.sleep(0.1)  # let some futures be scheduled
88    raise ValueError('error in initializer')
89
90
91class MyObject(object):
92    def my_method(self):
93        pass
94
95
96class EventfulGCObj():
97    def __init__(self, mgr):
98        self.event = mgr.Event()
99
100    def __del__(self):
101        self.event.set()
102
103
104def make_dummy_object(_):
105    return MyObject()
106
107
108class BaseTestCase(unittest.TestCase):
109    def setUp(self):
110        self._thread_key = threading_helper.threading_setup()
111
112    def tearDown(self):
113        support.reap_children()
114        threading_helper.threading_cleanup(*self._thread_key)
115
116
117class ExecutorMixin:
118    worker_count = 5
119    executor_kwargs = {}
120
121    def setUp(self):
122        super().setUp()
123
124        self.t1 = time.monotonic()
125        if hasattr(self, "ctx"):
126            self.executor = self.executor_type(
127                max_workers=self.worker_count,
128                mp_context=self.get_context(),
129                **self.executor_kwargs)
130        else:
131            self.executor = self.executor_type(
132                max_workers=self.worker_count,
133                **self.executor_kwargs)
134        self._prime_executor()
135
136    def tearDown(self):
137        self.executor.shutdown(wait=True)
138        self.executor = None
139
140        dt = time.monotonic() - self.t1
141        if support.verbose:
142            print("%.2fs" % dt, end=' ')
143        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
144
145        super().tearDown()
146
147    def get_context(self):
148        return get_context(self.ctx)
149
150    def _prime_executor(self):
151        # Make sure that the executor is ready to do work before running the
152        # tests. This should reduce the probability of timeouts in the tests.
153        futures = [self.executor.submit(time.sleep, 0.1)
154                   for _ in range(self.worker_count)]
155        for f in futures:
156            f.result()
157
158
159class ThreadPoolMixin(ExecutorMixin):
160    executor_type = futures.ThreadPoolExecutor
161
162
163class ProcessPoolForkMixin(ExecutorMixin):
164    executor_type = futures.ProcessPoolExecutor
165    ctx = "fork"
166
167    def get_context(self):
168        try:
169            _check_system_limits()
170        except NotImplementedError:
171            self.skipTest("ProcessPoolExecutor unavailable on this system")
172        if sys.platform == "win32":
173            self.skipTest("require unix system")
174        return super().get_context()
175
176
177class ProcessPoolSpawnMixin(ExecutorMixin):
178    executor_type = futures.ProcessPoolExecutor
179    ctx = "spawn"
180
181    def get_context(self):
182        try:
183            _check_system_limits()
184        except NotImplementedError:
185            self.skipTest("ProcessPoolExecutor unavailable on this system")
186        return super().get_context()
187
188
189class ProcessPoolForkserverMixin(ExecutorMixin):
190    executor_type = futures.ProcessPoolExecutor
191    ctx = "forkserver"
192
193    def get_context(self):
194        try:
195            _check_system_limits()
196        except NotImplementedError:
197            self.skipTest("ProcessPoolExecutor unavailable on this system")
198        if sys.platform == "win32":
199            self.skipTest("require unix system")
200        return super().get_context()
201
202
203def create_executor_tests(mixin, bases=(BaseTestCase,),
204                          executor_mixins=(ThreadPoolMixin,
205                                           ProcessPoolForkMixin,
206                                           ProcessPoolForkserverMixin,
207                                           ProcessPoolSpawnMixin)):
208    def strip_mixin(name):
209        if name.endswith(('Mixin', 'Tests')):
210            return name[:-5]
211        elif name.endswith('Test'):
212            return name[:-4]
213        else:
214            return name
215
216    for exe in executor_mixins:
217        name = ("%s%sTest"
218                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
219        cls = type(name, (mixin,) + (exe,) + bases, {})
220        globals()[name] = cls
221
222
223class InitializerMixin(ExecutorMixin):
224    worker_count = 2
225
226    def setUp(self):
227        global INITIALIZER_STATUS
228        INITIALIZER_STATUS = 'uninitialized'
229        self.executor_kwargs = dict(initializer=init,
230                                    initargs=('initialized',))
231        super().setUp()
232
233    def test_initializer(self):
234        futures = [self.executor.submit(get_init_status)
235                   for _ in range(self.worker_count)]
236
237        for f in futures:
238            self.assertEqual(f.result(), 'initialized')
239
240
241class FailingInitializerMixin(ExecutorMixin):
242    worker_count = 2
243
244    def setUp(self):
245        if hasattr(self, "ctx"):
246            # Pass a queue to redirect the child's logging output
247            self.mp_context = self.get_context()
248            self.log_queue = self.mp_context.Queue()
249            self.executor_kwargs = dict(initializer=init_fail,
250                                        initargs=(self.log_queue,))
251        else:
252            # In a thread pool, the child shares our logging setup
253            # (see _assert_logged())
254            self.mp_context = None
255            self.log_queue = None
256            self.executor_kwargs = dict(initializer=init_fail)
257        super().setUp()
258
259    def test_initializer(self):
260        with self._assert_logged('ValueError: error in initializer'):
261            try:
262                future = self.executor.submit(get_init_status)
263            except BrokenExecutor:
264                # Perhaps the executor is already broken
265                pass
266            else:
267                with self.assertRaises(BrokenExecutor):
268                    future.result()
269            # At some point, the executor should break
270            t1 = time.monotonic()
271            while not self.executor._broken:
272                if time.monotonic() - t1 > 5:
273                    self.fail("executor not broken after 5 s.")
274                time.sleep(0.01)
275            # ... and from this point submit() is guaranteed to fail
276            with self.assertRaises(BrokenExecutor):
277                self.executor.submit(get_init_status)
278
279    def _prime_executor(self):
280        pass
281
282    @contextlib.contextmanager
283    def _assert_logged(self, msg):
284        if self.log_queue is not None:
285            yield
286            output = []
287            try:
288                while True:
289                    output.append(self.log_queue.get_nowait().getMessage())
290            except queue.Empty:
291                pass
292        else:
293            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
294                yield
295            output = cm.output
296        self.assertTrue(any(msg in line for line in output),
297                        output)
298
299
300create_executor_tests(InitializerMixin)
301create_executor_tests(FailingInitializerMixin)
302
303
304class ExecutorShutdownTest:
305    def test_run_after_shutdown(self):
306        self.executor.shutdown()
307        self.assertRaises(RuntimeError,
308                          self.executor.submit,
309                          pow, 2, 5)
310
311    def test_interpreter_shutdown(self):
312        # Test the atexit hook for shutdown of worker threads and processes
313        rc, out, err = assert_python_ok('-c', """if 1:
314            from concurrent.futures import {executor_type}
315            from time import sleep
316            from test.test_concurrent_futures import sleep_and_print
317            if __name__ == "__main__":
318                context = '{context}'
319                if context == "":
320                    t = {executor_type}(5)
321                else:
322                    from multiprocessing import get_context
323                    context = get_context(context)
324                    t = {executor_type}(5, mp_context=context)
325                t.submit(sleep_and_print, 1.0, "apple")
326            """.format(executor_type=self.executor_type.__name__,
327                       context=getattr(self, "ctx", "")))
328        # Errors in atexit hooks don't change the process exit code, check
329        # stderr manually.
330        self.assertFalse(err)
331        self.assertEqual(out.strip(), b"apple")
332
333    def test_submit_after_interpreter_shutdown(self):
334        # Test the atexit hook for shutdown of worker threads and processes
335        rc, out, err = assert_python_ok('-c', """if 1:
336            import atexit
337            @atexit.register
338            def run_last():
339                try:
340                    t.submit(id, None)
341                except RuntimeError:
342                    print("runtime-error")
343                    raise
344            from concurrent.futures import {executor_type}
345            if __name__ == "__main__":
346                context = '{context}'
347                if not context:
348                    t = {executor_type}(5)
349                else:
350                    from multiprocessing import get_context
351                    context = get_context(context)
352                    t = {executor_type}(5, mp_context=context)
353                    t.submit(id, 42).result()
354            """.format(executor_type=self.executor_type.__name__,
355                       context=getattr(self, "ctx", "")))
356        # Errors in atexit hooks don't change the process exit code, check
357        # stderr manually.
358        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
359        self.assertEqual(out.strip(), b"runtime-error")
360
361    def test_hang_issue12364(self):
362        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
363        self.executor.shutdown()
364        for f in fs:
365            f.result()
366
367    def test_cancel_futures(self):
368        executor = self.executor_type(max_workers=3)
369        fs = [executor.submit(time.sleep, .1) for _ in range(50)]
370        executor.shutdown(cancel_futures=True)
371        # We can't guarantee the exact number of cancellations, but we can
372        # guarantee that *some* were cancelled. With setting max_workers to 3,
373        # most of the submitted futures should have been cancelled.
374        cancelled = [fut for fut in fs if fut.cancelled()]
375        self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
376
377        # Ensure the other futures were able to finish.
378        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
379        # that may have been left in a pending state.
380        others = [fut for fut in fs if not fut.cancelled()]
381        for fut in others:
382            self.assertTrue(fut.done(), msg=f"{fut._state=}")
383            self.assertIsNone(fut.exception())
384
385        # Similar to the number of cancelled futures, we can't guarantee the
386        # exact number that completed. But, we can guarantee that at least
387        # one finished.
388        self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
389
390    def test_hang_issue39205(self):
391        """shutdown(wait=False) doesn't hang at exit with running futures.
392
393        See https://bugs.python.org/issue39205.
394        """
395        if self.executor_type == futures.ProcessPoolExecutor:
396            raise unittest.SkipTest(
397                "Hangs due to https://bugs.python.org/issue39205")
398
399        rc, out, err = assert_python_ok('-c', """if True:
400            from concurrent.futures import {executor_type}
401            from test.test_concurrent_futures import sleep_and_print
402            if __name__ == "__main__":
403                t = {executor_type}(max_workers=3)
404                t.submit(sleep_and_print, 1.0, "apple")
405                t.shutdown(wait=False)
406            """.format(executor_type=self.executor_type.__name__))
407        self.assertFalse(err)
408        self.assertEqual(out.strip(), b"apple")
409
410
411class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
412    def _prime_executor(self):
413        pass
414
415    def test_threads_terminate(self):
416        def acquire_lock(lock):
417            lock.acquire()
418
419        sem = threading.Semaphore(0)
420        for i in range(3):
421            self.executor.submit(acquire_lock, sem)
422        self.assertEqual(len(self.executor._threads), 3)
423        for i in range(3):
424            sem.release()
425        self.executor.shutdown()
426        for t in self.executor._threads:
427            t.join()
428
429    def test_context_manager_shutdown(self):
430        with futures.ThreadPoolExecutor(max_workers=5) as e:
431            executor = e
432            self.assertEqual(list(e.map(abs, range(-5, 5))),
433                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
434
435        for t in executor._threads:
436            t.join()
437
438    def test_del_shutdown(self):
439        executor = futures.ThreadPoolExecutor(max_workers=5)
440        res = executor.map(abs, range(-5, 5))
441        threads = executor._threads
442        del executor
443
444        for t in threads:
445            t.join()
446
447        # Make sure the results were all computed before the
448        # executor got shutdown.
449        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
450
451    def test_shutdown_no_wait(self):
452        # Ensure that the executor cleans up the threads when calling
453        # shutdown with wait=False
454        executor = futures.ThreadPoolExecutor(max_workers=5)
455        res = executor.map(abs, range(-5, 5))
456        threads = executor._threads
457        executor.shutdown(wait=False)
458        for t in threads:
459            t.join()
460
461        # Make sure the results were all computed before the
462        # executor got shutdown.
463        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
464
465
466    def test_thread_names_assigned(self):
467        executor = futures.ThreadPoolExecutor(
468            max_workers=5, thread_name_prefix='SpecialPool')
469        executor.map(abs, range(-5, 5))
470        threads = executor._threads
471        del executor
472        support.gc_collect()  # For PyPy or other GCs.
473
474        for t in threads:
475            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
476            t.join()
477
478    def test_thread_names_default(self):
479        executor = futures.ThreadPoolExecutor(max_workers=5)
480        executor.map(abs, range(-5, 5))
481        threads = executor._threads
482        del executor
483        support.gc_collect()  # For PyPy or other GCs.
484
485        for t in threads:
486            # Ensure that our default name is reasonably sane and unique when
487            # no thread_name_prefix was supplied.
488            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
489            t.join()
490
491    def test_cancel_futures_wait_false(self):
492        # Can only be reliably tested for TPE, since PPE often hangs with
493        # `wait=False` (even without *cancel_futures*).
494        rc, out, err = assert_python_ok('-c', """if True:
495            from concurrent.futures import ThreadPoolExecutor
496            from test.test_concurrent_futures import sleep_and_print
497            if __name__ == "__main__":
498                t = ThreadPoolExecutor()
499                t.submit(sleep_and_print, .1, "apple")
500                t.shutdown(wait=False, cancel_futures=True)
501            """.format(executor_type=self.executor_type.__name__))
502        # Errors in atexit hooks don't change the process exit code, check
503        # stderr manually.
504        self.assertFalse(err)
505        self.assertEqual(out.strip(), b"apple")
506
507
508class ProcessPoolShutdownTest(ExecutorShutdownTest):
509    def _prime_executor(self):
510        pass
511
512    def test_processes_terminate(self):
513        def acquire_lock(lock):
514            lock.acquire()
515
516        mp_context = get_context()
517        sem = mp_context.Semaphore(0)
518        for _ in range(3):
519            self.executor.submit(acquire_lock, sem)
520        self.assertEqual(len(self.executor._processes), 3)
521        for _ in range(3):
522            sem.release()
523        processes = self.executor._processes
524        self.executor.shutdown()
525
526        for p in processes.values():
527            p.join()
528
529    def test_context_manager_shutdown(self):
530        with futures.ProcessPoolExecutor(max_workers=5) as e:
531            processes = e._processes
532            self.assertEqual(list(e.map(abs, range(-5, 5))),
533                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
534
535        for p in processes.values():
536            p.join()
537
538    def test_del_shutdown(self):
539        executor = futures.ProcessPoolExecutor(max_workers=5)
540        res = executor.map(abs, range(-5, 5))
541        executor_manager_thread = executor._executor_manager_thread
542        processes = executor._processes
543        call_queue = executor._call_queue
544        executor_manager_thread = executor._executor_manager_thread
545        del executor
546        support.gc_collect()  # For PyPy or other GCs.
547
548        # Make sure that all the executor resources were properly cleaned by
549        # the shutdown process
550        executor_manager_thread.join()
551        for p in processes.values():
552            p.join()
553        call_queue.join_thread()
554
555        # Make sure the results were all computed before the
556        # executor got shutdown.
557        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
558
559    def test_shutdown_no_wait(self):
560        # Ensure that the executor cleans up the processes when calling
561        # shutdown with wait=False
562        executor = futures.ProcessPoolExecutor(max_workers=5)
563        res = executor.map(abs, range(-5, 5))
564        processes = executor._processes
565        call_queue = executor._call_queue
566        executor_manager_thread = executor._executor_manager_thread
567        executor.shutdown(wait=False)
568
569        # Make sure that all the executor resources were properly cleaned by
570        # the shutdown process
571        executor_manager_thread.join()
572        for p in processes.values():
573            p.join()
574        call_queue.join_thread()
575
576        # Make sure the results were all computed before the executor got
577        # shutdown.
578        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
579
580
581create_executor_tests(ProcessPoolShutdownTest,
582                      executor_mixins=(ProcessPoolForkMixin,
583                                       ProcessPoolForkserverMixin,
584                                       ProcessPoolSpawnMixin))
585
586
587class WaitTests:
588    def test_20369(self):
589        # See https://bugs.python.org/issue20369
590        future = self.executor.submit(time.sleep, 1.5)
591        done, not_done = futures.wait([future, future],
592                            return_when=futures.ALL_COMPLETED)
593        self.assertEqual({future}, done)
594        self.assertEqual(set(), not_done)
595
596
597    def test_first_completed(self):
598        future1 = self.executor.submit(mul, 21, 2)
599        future2 = self.executor.submit(time.sleep, 1.5)
600
601        done, not_done = futures.wait(
602                [CANCELLED_FUTURE, future1, future2],
603                 return_when=futures.FIRST_COMPLETED)
604
605        self.assertEqual(set([future1]), done)
606        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
607
608    def test_first_completed_some_already_completed(self):
609        future1 = self.executor.submit(time.sleep, 1.5)
610
611        finished, pending = futures.wait(
612                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
613                 return_when=futures.FIRST_COMPLETED)
614
615        self.assertEqual(
616                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
617                finished)
618        self.assertEqual(set([future1]), pending)
619
620    def test_first_exception(self):
621        future1 = self.executor.submit(mul, 2, 21)
622        future2 = self.executor.submit(sleep_and_raise, 1.5)
623        future3 = self.executor.submit(time.sleep, 3)
624
625        finished, pending = futures.wait(
626                [future1, future2, future3],
627                return_when=futures.FIRST_EXCEPTION)
628
629        self.assertEqual(set([future1, future2]), finished)
630        self.assertEqual(set([future3]), pending)
631
632    def test_first_exception_some_already_complete(self):
633        future1 = self.executor.submit(divmod, 21, 0)
634        future2 = self.executor.submit(time.sleep, 1.5)
635
636        finished, pending = futures.wait(
637                [SUCCESSFUL_FUTURE,
638                 CANCELLED_FUTURE,
639                 CANCELLED_AND_NOTIFIED_FUTURE,
640                 future1, future2],
641                return_when=futures.FIRST_EXCEPTION)
642
643        self.assertEqual(set([SUCCESSFUL_FUTURE,
644                              CANCELLED_AND_NOTIFIED_FUTURE,
645                              future1]), finished)
646        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
647
648    def test_first_exception_one_already_failed(self):
649        future1 = self.executor.submit(time.sleep, 2)
650
651        finished, pending = futures.wait(
652                 [EXCEPTION_FUTURE, future1],
653                 return_when=futures.FIRST_EXCEPTION)
654
655        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
656        self.assertEqual(set([future1]), pending)
657
658    def test_all_completed(self):
659        future1 = self.executor.submit(divmod, 2, 0)
660        future2 = self.executor.submit(mul, 2, 21)
661
662        finished, pending = futures.wait(
663                [SUCCESSFUL_FUTURE,
664                 CANCELLED_AND_NOTIFIED_FUTURE,
665                 EXCEPTION_FUTURE,
666                 future1,
667                 future2],
668                return_when=futures.ALL_COMPLETED)
669
670        self.assertEqual(set([SUCCESSFUL_FUTURE,
671                              CANCELLED_AND_NOTIFIED_FUTURE,
672                              EXCEPTION_FUTURE,
673                              future1,
674                              future2]), finished)
675        self.assertEqual(set(), pending)
676
677    def test_timeout(self):
678        future1 = self.executor.submit(mul, 6, 7)
679        future2 = self.executor.submit(time.sleep, 6)
680
681        finished, pending = futures.wait(
682                [CANCELLED_AND_NOTIFIED_FUTURE,
683                 EXCEPTION_FUTURE,
684                 SUCCESSFUL_FUTURE,
685                 future1, future2],
686                timeout=5,
687                return_when=futures.ALL_COMPLETED)
688
689        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
690                              EXCEPTION_FUTURE,
691                              SUCCESSFUL_FUTURE,
692                              future1]), finished)
693        self.assertEqual(set([future2]), pending)
694
695
696class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
697
698    def test_pending_calls_race(self):
699        # Issue #14406: multi-threaded race condition when waiting on all
700        # futures.
701        event = threading.Event()
702        def future_func():
703            event.wait()
704        oldswitchinterval = sys.getswitchinterval()
705        sys.setswitchinterval(1e-6)
706        try:
707            fs = {self.executor.submit(future_func) for i in range(100)}
708            event.set()
709            futures.wait(fs, return_when=futures.ALL_COMPLETED)
710        finally:
711            sys.setswitchinterval(oldswitchinterval)
712
713
714create_executor_tests(WaitTests,
715                      executor_mixins=(ProcessPoolForkMixin,
716                                       ProcessPoolForkserverMixin,
717                                       ProcessPoolSpawnMixin))
718
719
720class AsCompletedTests:
721    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
722    def test_no_timeout(self):
723        future1 = self.executor.submit(mul, 2, 21)
724        future2 = self.executor.submit(mul, 7, 6)
725
726        completed = set(futures.as_completed(
727                [CANCELLED_AND_NOTIFIED_FUTURE,
728                 EXCEPTION_FUTURE,
729                 SUCCESSFUL_FUTURE,
730                 future1, future2]))
731        self.assertEqual(set(
732                [CANCELLED_AND_NOTIFIED_FUTURE,
733                 EXCEPTION_FUTURE,
734                 SUCCESSFUL_FUTURE,
735                 future1, future2]),
736                completed)
737
738    def test_zero_timeout(self):
739        future1 = self.executor.submit(time.sleep, 2)
740        completed_futures = set()
741        try:
742            for future in futures.as_completed(
743                    [CANCELLED_AND_NOTIFIED_FUTURE,
744                     EXCEPTION_FUTURE,
745                     SUCCESSFUL_FUTURE,
746                     future1],
747                    timeout=0):
748                completed_futures.add(future)
749        except futures.TimeoutError:
750            pass
751
752        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
753                              EXCEPTION_FUTURE,
754                              SUCCESSFUL_FUTURE]),
755                         completed_futures)
756
757    def test_duplicate_futures(self):
758        # Issue 20367. Duplicate futures should not raise exceptions or give
759        # duplicate responses.
760        # Issue #31641: accept arbitrary iterables.
761        future1 = self.executor.submit(time.sleep, 2)
762        completed = [
763            f for f in futures.as_completed(itertools.repeat(future1, 3))
764        ]
765        self.assertEqual(len(completed), 1)
766
767    def test_free_reference_yielded_future(self):
768        # Issue #14406: Generator should not keep references
769        # to finished futures.
770        futures_list = [Future() for _ in range(8)]
771        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
772        futures_list.append(create_future(state=FINISHED, result=42))
773
774        with self.assertRaises(futures.TimeoutError):
775            for future in futures.as_completed(futures_list, timeout=0):
776                futures_list.remove(future)
777                wr = weakref.ref(future)
778                del future
779                support.gc_collect()  # For PyPy or other GCs.
780                self.assertIsNone(wr())
781
782        futures_list[0].set_result("test")
783        for future in futures.as_completed(futures_list):
784            futures_list.remove(future)
785            wr = weakref.ref(future)
786            del future
787            support.gc_collect()  # For PyPy or other GCs.
788            self.assertIsNone(wr())
789            if futures_list:
790                futures_list[0].set_result("test")
791
792    def test_correct_timeout_exception_msg(self):
793        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
794                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
795
796        with self.assertRaises(futures.TimeoutError) as cm:
797            list(futures.as_completed(futures_list, timeout=0))
798
799        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
800
801
802create_executor_tests(AsCompletedTests)
803
804
805class ExecutorTest:
806    # Executor.shutdown() and context manager usage is tested by
807    # ExecutorShutdownTest.
808    def test_submit(self):
809        future = self.executor.submit(pow, 2, 8)
810        self.assertEqual(256, future.result())
811
812    def test_submit_keyword(self):
813        future = self.executor.submit(mul, 2, y=8)
814        self.assertEqual(16, future.result())
815        future = self.executor.submit(capture, 1, self=2, fn=3)
816        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
817        with self.assertRaises(TypeError):
818            self.executor.submit(fn=capture, arg=1)
819        with self.assertRaises(TypeError):
820            self.executor.submit(arg=1)
821
822    def test_map(self):
823        self.assertEqual(
824                list(self.executor.map(pow, range(10), range(10))),
825                list(map(pow, range(10), range(10))))
826
827        self.assertEqual(
828                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
829                list(map(pow, range(10), range(10))))
830
831    def test_map_exception(self):
832        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
833        self.assertEqual(i.__next__(), (0, 1))
834        self.assertEqual(i.__next__(), (0, 1))
835        self.assertRaises(ZeroDivisionError, i.__next__)
836
837    def test_map_timeout(self):
838        results = []
839        try:
840            for i in self.executor.map(time.sleep,
841                                       [0, 0, 6],
842                                       timeout=5):
843                results.append(i)
844        except futures.TimeoutError:
845            pass
846        else:
847            self.fail('expected TimeoutError')
848
849        self.assertEqual([None, None], results)
850
851    def test_shutdown_race_issue12456(self):
852        # Issue #12456: race condition at shutdown where trying to post a
853        # sentinel in the call queue blocks (the queue is full while processes
854        # have exited).
855        self.executor.map(str, [2] * (self.worker_count + 1))
856        self.executor.shutdown()
857
858    @support.cpython_only
859    def test_no_stale_references(self):
860        # Issue #16284: check that the executors don't unnecessarily hang onto
861        # references.
862        my_object = MyObject()
863        my_object_collected = threading.Event()
864        my_object_callback = weakref.ref(
865            my_object, lambda obj: my_object_collected.set())
866        # Deliberately discarding the future.
867        self.executor.submit(my_object.my_method)
868        del my_object
869
870        collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
871        self.assertTrue(collected,
872                        "Stale reference not collected within timeout.")
873
874    def test_max_workers_negative(self):
875        for number in (0, -1):
876            with self.assertRaisesRegex(ValueError,
877                                        "max_workers must be greater "
878                                        "than 0"):
879                self.executor_type(max_workers=number)
880
881    def test_free_reference(self):
882        # Issue #14406: Result iterator should not keep an internal
883        # reference to result objects.
884        for obj in self.executor.map(make_dummy_object, range(10)):
885            wr = weakref.ref(obj)
886            del obj
887            support.gc_collect()  # For PyPy or other GCs.
888            self.assertIsNone(wr())
889
890
891class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
892    def test_map_submits_without_iteration(self):
893        """Tests verifying issue 11777."""
894        finished = []
895        def record_finished(n):
896            finished.append(n)
897
898        self.executor.map(record_finished, range(10))
899        self.executor.shutdown(wait=True)
900        self.assertCountEqual(finished, range(10))
901
902    def test_default_workers(self):
903        executor = self.executor_type()
904        expected = min(32, (os.cpu_count() or 1) + 4)
905        self.assertEqual(executor._max_workers, expected)
906
907    def test_saturation(self):
908        executor = self.executor_type(4)
909        def acquire_lock(lock):
910            lock.acquire()
911
912        sem = threading.Semaphore(0)
913        for i in range(15 * executor._max_workers):
914            executor.submit(acquire_lock, sem)
915        self.assertEqual(len(executor._threads), executor._max_workers)
916        for i in range(15 * executor._max_workers):
917            sem.release()
918        executor.shutdown(wait=True)
919
920    def test_idle_thread_reuse(self):
921        executor = self.executor_type()
922        executor.submit(mul, 21, 2).result()
923        executor.submit(mul, 6, 7).result()
924        executor.submit(mul, 3, 14).result()
925        self.assertEqual(len(executor._threads), 1)
926        executor.shutdown(wait=True)
927
928    @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
929    def test_hang_global_shutdown_lock(self):
930        # bpo-45021: _global_shutdown_lock should be reinitialized in the child
931        # process, otherwise it will never exit
932        def submit(pool):
933            pool.submit(submit, pool)
934
935        with futures.ThreadPoolExecutor(1) as pool:
936            pool.submit(submit, pool)
937
938            for _ in range(50):
939                with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
940                    workers.submit(tuple)
941
942
943class ProcessPoolExecutorTest(ExecutorTest):
944
945    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
946    def test_max_workers_too_large(self):
947        with self.assertRaisesRegex(ValueError,
948                                    "max_workers must be <= 61"):
949            futures.ProcessPoolExecutor(max_workers=62)
950
951    def test_killed_child(self):
952        # When a child process is abruptly terminated, the whole pool gets
953        # "broken".
954        futures = [self.executor.submit(time.sleep, 3)]
955        # Get one of the processes, and terminate (kill) it
956        p = next(iter(self.executor._processes.values()))
957        p.terminate()
958        for fut in futures:
959            self.assertRaises(BrokenProcessPool, fut.result)
960        # Submitting other jobs fails as well.
961        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
962
963    def test_map_chunksize(self):
964        def bad_map():
965            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
966
967        ref = list(map(pow, range(40), range(40)))
968        self.assertEqual(
969            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
970            ref)
971        self.assertEqual(
972            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
973            ref)
974        self.assertEqual(
975            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
976            ref)
977        self.assertRaises(ValueError, bad_map)
978
979    @classmethod
980    def _test_traceback(cls):
981        raise RuntimeError(123) # some comment
982
983    def test_traceback(self):
984        # We want ensure that the traceback from the child process is
985        # contained in the traceback raised in the main process.
986        future = self.executor.submit(self._test_traceback)
987        with self.assertRaises(Exception) as cm:
988            future.result()
989
990        exc = cm.exception
991        self.assertIs(type(exc), RuntimeError)
992        self.assertEqual(exc.args, (123,))
993        cause = exc.__cause__
994        self.assertIs(type(cause), futures.process._RemoteTraceback)
995        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
996
997        with support.captured_stderr() as f1:
998            try:
999                raise exc
1000            except RuntimeError:
1001                sys.excepthook(*sys.exc_info())
1002        self.assertIn('raise RuntimeError(123) # some comment',
1003                      f1.getvalue())
1004
1005    @hashlib_helper.requires_hashdigest('md5')
1006    def test_ressources_gced_in_workers(self):
1007        # Ensure that argument for a job are correctly gc-ed after the job
1008        # is finished
1009        mgr = get_context(self.ctx).Manager()
1010        obj = EventfulGCObj(mgr)
1011        future = self.executor.submit(id, obj)
1012        future.result()
1013
1014        self.assertTrue(obj.event.wait(timeout=1))
1015
1016        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
1017        # is called while manager is still running.
1018        obj = None
1019        support.gc_collect()
1020
1021        mgr.shutdown()
1022        mgr.join()
1023
1024    def test_saturation(self):
1025        executor = self.executor_type(4)
1026        mp_context = get_context()
1027        sem = mp_context.Semaphore(0)
1028        job_count = 15 * executor._max_workers
1029        try:
1030            for _ in range(job_count):
1031                executor.submit(sem.acquire)
1032            self.assertEqual(len(executor._processes), executor._max_workers)
1033            for _ in range(job_count):
1034                sem.release()
1035        finally:
1036            executor.shutdown()
1037
1038    def test_idle_process_reuse_one(self):
1039        executor = self.executor_type(4)
1040        executor.submit(mul, 21, 2).result()
1041        executor.submit(mul, 6, 7).result()
1042        executor.submit(mul, 3, 14).result()
1043        self.assertEqual(len(executor._processes), 1)
1044        executor.shutdown()
1045
1046    def test_idle_process_reuse_multiple(self):
1047        executor = self.executor_type(4)
1048        executor.submit(mul, 12, 7).result()
1049        executor.submit(mul, 33, 25)
1050        executor.submit(mul, 25, 26).result()
1051        executor.submit(mul, 18, 29)
1052        self.assertLessEqual(len(executor._processes), 2)
1053        executor.shutdown()
1054
1055create_executor_tests(ProcessPoolExecutorTest,
1056                      executor_mixins=(ProcessPoolForkMixin,
1057                                       ProcessPoolForkserverMixin,
1058                                       ProcessPoolSpawnMixin))
1059
1060def _crash(delay=None):
1061    """Induces a segfault."""
1062    if delay:
1063        time.sleep(delay)
1064    import faulthandler
1065    faulthandler.disable()
1066    faulthandler._sigsegv()
1067
1068
1069def _exit():
1070    """Induces a sys exit with exitcode 1."""
1071    sys.exit(1)
1072
1073
1074def _raise_error(Err):
1075    """Function that raises an Exception in process."""
1076    raise Err()
1077
1078
1079def _raise_error_ignore_stderr(Err):
1080    """Function that raises an Exception in process and ignores stderr."""
1081    import io
1082    sys.stderr = io.StringIO()
1083    raise Err()
1084
1085
1086def _return_instance(cls):
1087    """Function that returns a instance of cls."""
1088    return cls()
1089
1090
1091class CrashAtPickle(object):
1092    """Bad object that triggers a segfault at pickling time."""
1093    def __reduce__(self):
1094        _crash()
1095
1096
1097class CrashAtUnpickle(object):
1098    """Bad object that triggers a segfault at unpickling time."""
1099    def __reduce__(self):
1100        return _crash, ()
1101
1102
1103class ExitAtPickle(object):
1104    """Bad object that triggers a process exit at pickling time."""
1105    def __reduce__(self):
1106        _exit()
1107
1108
1109class ExitAtUnpickle(object):
1110    """Bad object that triggers a process exit at unpickling time."""
1111    def __reduce__(self):
1112        return _exit, ()
1113
1114
1115class ErrorAtPickle(object):
1116    """Bad object that triggers an error at pickling time."""
1117    def __reduce__(self):
1118        from pickle import PicklingError
1119        raise PicklingError("Error in pickle")
1120
1121
1122class ErrorAtUnpickle(object):
1123    """Bad object that triggers an error at unpickling time."""
1124    def __reduce__(self):
1125        from pickle import UnpicklingError
1126        return _raise_error_ignore_stderr, (UnpicklingError, )
1127
1128
1129class ExecutorDeadlockTest:
1130    TIMEOUT = support.SHORT_TIMEOUT
1131
1132    def _fail_on_deadlock(self, executor):
1133        # If we did not recover before TIMEOUT seconds, consider that the
1134        # executor is in a deadlock state and forcefully clean all its
1135        # composants.
1136        import faulthandler
1137        from tempfile import TemporaryFile
1138        with TemporaryFile(mode="w+") as f:
1139            faulthandler.dump_traceback(file=f)
1140            f.seek(0)
1141            tb = f.read()
1142        for p in executor._processes.values():
1143            p.terminate()
1144        # This should be safe to call executor.shutdown here as all possible
1145        # deadlocks should have been broken.
1146        executor.shutdown(wait=True)
1147        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
1148        self.fail(f"Executor deadlock:\n\n{tb}")
1149
1150
1151    def _check_crash(self, error, func, *args, ignore_stderr=False):
1152        # test for deadlock caused by crashes in a pool
1153        self.executor.shutdown(wait=True)
1154
1155        executor = self.executor_type(
1156            max_workers=2, mp_context=get_context(self.ctx))
1157        res = executor.submit(func, *args)
1158
1159        if ignore_stderr:
1160            cm = support.captured_stderr()
1161        else:
1162            cm = contextlib.nullcontext()
1163
1164        try:
1165            with self.assertRaises(error):
1166                with cm:
1167                    res.result(timeout=self.TIMEOUT)
1168        except futures.TimeoutError:
1169            # If we did not recover before TIMEOUT seconds,
1170            # consider that the executor is in a deadlock state
1171            self._fail_on_deadlock(executor)
1172        executor.shutdown(wait=True)
1173
1174    def test_error_at_task_pickle(self):
1175        # Check problem occurring while pickling a task in
1176        # the task_handler thread
1177        self._check_crash(PicklingError, id, ErrorAtPickle())
1178
1179    def test_exit_at_task_unpickle(self):
1180        # Check problem occurring while unpickling a task on workers
1181        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
1182
1183    def test_error_at_task_unpickle(self):
1184        # Check problem occurring while unpickling a task on workers
1185        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
1186
1187    def test_crash_at_task_unpickle(self):
1188        # Check problem occurring while unpickling a task on workers
1189        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
1190
1191    def test_crash_during_func_exec_on_worker(self):
1192        # Check problem occurring during func execution on workers
1193        self._check_crash(BrokenProcessPool, _crash)
1194
1195    def test_exit_during_func_exec_on_worker(self):
1196        # Check problem occurring during func execution on workers
1197        self._check_crash(SystemExit, _exit)
1198
1199    def test_error_during_func_exec_on_worker(self):
1200        # Check problem occurring during func execution on workers
1201        self._check_crash(RuntimeError, _raise_error, RuntimeError)
1202
1203    def test_crash_during_result_pickle_on_worker(self):
1204        # Check problem occurring while pickling a task result
1205        # on workers
1206        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
1207
1208    def test_exit_during_result_pickle_on_worker(self):
1209        # Check problem occurring while pickling a task result
1210        # on workers
1211        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
1212
1213    def test_error_during_result_pickle_on_worker(self):
1214        # Check problem occurring while pickling a task result
1215        # on workers
1216        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
1217
1218    def test_error_during_result_unpickle_in_result_handler(self):
1219        # Check problem occurring while unpickling a task in
1220        # the result_handler thread
1221        self._check_crash(BrokenProcessPool,
1222                          _return_instance, ErrorAtUnpickle,
1223                          ignore_stderr=True)
1224
1225    def test_exit_during_result_unpickle_in_result_handler(self):
1226        # Check problem occurring while unpickling a task in
1227        # the result_handler thread
1228        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
1229
1230    def test_shutdown_deadlock(self):
1231        # Test that the pool calling shutdown do not cause deadlock
1232        # if a worker fails after the shutdown call.
1233        self.executor.shutdown(wait=True)
1234        with self.executor_type(max_workers=2,
1235                                mp_context=get_context(self.ctx)) as executor:
1236            self.executor = executor  # Allow clean up in fail_on_deadlock
1237            f = executor.submit(_crash, delay=.1)
1238            executor.shutdown(wait=True)
1239            with self.assertRaises(BrokenProcessPool):
1240                f.result()
1241
1242    def test_shutdown_deadlock_pickle(self):
1243        # Test that the pool calling shutdown with wait=False does not cause
1244        # a deadlock if a task fails at pickle after the shutdown call.
1245        # Reported in bpo-39104.
1246        self.executor.shutdown(wait=True)
1247        with self.executor_type(max_workers=2,
1248                                mp_context=get_context(self.ctx)) as executor:
1249            self.executor = executor  # Allow clean up in fail_on_deadlock
1250
1251            # Start the executor and get the executor_manager_thread to collect
1252            # the threads and avoid dangling thread that should be cleaned up
1253            # asynchronously.
1254            executor.submit(id, 42).result()
1255            executor_manager = executor._executor_manager_thread
1256
1257            # Submit a task that fails at pickle and shutdown the executor
1258            # without waiting
1259            f = executor.submit(id, ErrorAtPickle())
1260            executor.shutdown(wait=False)
1261            with self.assertRaises(PicklingError):
1262                f.result()
1263
1264        # Make sure the executor is eventually shutdown and do not leave
1265        # dangling threads
1266        executor_manager.join()
1267
1268
1269create_executor_tests(ExecutorDeadlockTest,
1270                      executor_mixins=(ProcessPoolForkMixin,
1271                                       ProcessPoolForkserverMixin,
1272                                       ProcessPoolSpawnMixin))
1273
1274
1275class FutureTests(BaseTestCase):
1276    def test_done_callback_with_result(self):
1277        callback_result = None
1278        def fn(callback_future):
1279            nonlocal callback_result
1280            callback_result = callback_future.result()
1281
1282        f = Future()
1283        f.add_done_callback(fn)
1284        f.set_result(5)
1285        self.assertEqual(5, callback_result)
1286
1287    def test_done_callback_with_exception(self):
1288        callback_exception = None
1289        def fn(callback_future):
1290            nonlocal callback_exception
1291            callback_exception = callback_future.exception()
1292
1293        f = Future()
1294        f.add_done_callback(fn)
1295        f.set_exception(Exception('test'))
1296        self.assertEqual(('test',), callback_exception.args)
1297
1298    def test_done_callback_with_cancel(self):
1299        was_cancelled = None
1300        def fn(callback_future):
1301            nonlocal was_cancelled
1302            was_cancelled = callback_future.cancelled()
1303
1304        f = Future()
1305        f.add_done_callback(fn)
1306        self.assertTrue(f.cancel())
1307        self.assertTrue(was_cancelled)
1308
1309    def test_done_callback_raises(self):
1310        with support.captured_stderr() as stderr:
1311            raising_was_called = False
1312            fn_was_called = False
1313
1314            def raising_fn(callback_future):
1315                nonlocal raising_was_called
1316                raising_was_called = True
1317                raise Exception('doh!')
1318
1319            def fn(callback_future):
1320                nonlocal fn_was_called
1321                fn_was_called = True
1322
1323            f = Future()
1324            f.add_done_callback(raising_fn)
1325            f.add_done_callback(fn)
1326            f.set_result(5)
1327            self.assertTrue(raising_was_called)
1328            self.assertTrue(fn_was_called)
1329            self.assertIn('Exception: doh!', stderr.getvalue())
1330
1331    def test_done_callback_already_successful(self):
1332        callback_result = None
1333        def fn(callback_future):
1334            nonlocal callback_result
1335            callback_result = callback_future.result()
1336
1337        f = Future()
1338        f.set_result(5)
1339        f.add_done_callback(fn)
1340        self.assertEqual(5, callback_result)
1341
1342    def test_done_callback_already_failed(self):
1343        callback_exception = None
1344        def fn(callback_future):
1345            nonlocal callback_exception
1346            callback_exception = callback_future.exception()
1347
1348        f = Future()
1349        f.set_exception(Exception('test'))
1350        f.add_done_callback(fn)
1351        self.assertEqual(('test',), callback_exception.args)
1352
1353    def test_done_callback_already_cancelled(self):
1354        was_cancelled = None
1355        def fn(callback_future):
1356            nonlocal was_cancelled
1357            was_cancelled = callback_future.cancelled()
1358
1359        f = Future()
1360        self.assertTrue(f.cancel())
1361        f.add_done_callback(fn)
1362        self.assertTrue(was_cancelled)
1363
1364    def test_done_callback_raises_already_succeeded(self):
1365        with support.captured_stderr() as stderr:
1366            def raising_fn(callback_future):
1367                raise Exception('doh!')
1368
1369            f = Future()
1370
1371            # Set the result first to simulate a future that runs instantly,
1372            # effectively allowing the callback to be run immediately.
1373            f.set_result(5)
1374            f.add_done_callback(raising_fn)
1375
1376            self.assertIn('exception calling callback for', stderr.getvalue())
1377            self.assertIn('doh!', stderr.getvalue())
1378
1379
1380    def test_repr(self):
1381        self.assertRegex(repr(PENDING_FUTURE),
1382                         '<Future at 0x[0-9a-f]+ state=pending>')
1383        self.assertRegex(repr(RUNNING_FUTURE),
1384                         '<Future at 0x[0-9a-f]+ state=running>')
1385        self.assertRegex(repr(CANCELLED_FUTURE),
1386                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1387        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1388                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1389        self.assertRegex(
1390                repr(EXCEPTION_FUTURE),
1391                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
1392        self.assertRegex(
1393                repr(SUCCESSFUL_FUTURE),
1394                '<Future at 0x[0-9a-f]+ state=finished returned int>')
1395
1396
1397    def test_cancel(self):
1398        f1 = create_future(state=PENDING)
1399        f2 = create_future(state=RUNNING)
1400        f3 = create_future(state=CANCELLED)
1401        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
1402        f5 = create_future(state=FINISHED, exception=OSError())
1403        f6 = create_future(state=FINISHED, result=5)
1404
1405        self.assertTrue(f1.cancel())
1406        self.assertEqual(f1._state, CANCELLED)
1407
1408        self.assertFalse(f2.cancel())
1409        self.assertEqual(f2._state, RUNNING)
1410
1411        self.assertTrue(f3.cancel())
1412        self.assertEqual(f3._state, CANCELLED)
1413
1414        self.assertTrue(f4.cancel())
1415        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1416
1417        self.assertFalse(f5.cancel())
1418        self.assertEqual(f5._state, FINISHED)
1419
1420        self.assertFalse(f6.cancel())
1421        self.assertEqual(f6._state, FINISHED)
1422
1423    def test_cancelled(self):
1424        self.assertFalse(PENDING_FUTURE.cancelled())
1425        self.assertFalse(RUNNING_FUTURE.cancelled())
1426        self.assertTrue(CANCELLED_FUTURE.cancelled())
1427        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1428        self.assertFalse(EXCEPTION_FUTURE.cancelled())
1429        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1430
1431    def test_done(self):
1432        self.assertFalse(PENDING_FUTURE.done())
1433        self.assertFalse(RUNNING_FUTURE.done())
1434        self.assertTrue(CANCELLED_FUTURE.done())
1435        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1436        self.assertTrue(EXCEPTION_FUTURE.done())
1437        self.assertTrue(SUCCESSFUL_FUTURE.done())
1438
1439    def test_running(self):
1440        self.assertFalse(PENDING_FUTURE.running())
1441        self.assertTrue(RUNNING_FUTURE.running())
1442        self.assertFalse(CANCELLED_FUTURE.running())
1443        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1444        self.assertFalse(EXCEPTION_FUTURE.running())
1445        self.assertFalse(SUCCESSFUL_FUTURE.running())
1446
1447    def test_result_with_timeout(self):
1448        self.assertRaises(futures.TimeoutError,
1449                          PENDING_FUTURE.result, timeout=0)
1450        self.assertRaises(futures.TimeoutError,
1451                          RUNNING_FUTURE.result, timeout=0)
1452        self.assertRaises(futures.CancelledError,
1453                          CANCELLED_FUTURE.result, timeout=0)
1454        self.assertRaises(futures.CancelledError,
1455                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
1456        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1457        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1458
1459    def test_result_with_success(self):
1460        # TODO(brian@sweetapp.com): This test is timing dependent.
1461        def notification():
1462            # Wait until the main thread is waiting for the result.
1463            time.sleep(1)
1464            f1.set_result(42)
1465
1466        f1 = create_future(state=PENDING)
1467        t = threading.Thread(target=notification)
1468        t.start()
1469
1470        self.assertEqual(f1.result(timeout=5), 42)
1471        t.join()
1472
1473    def test_result_with_cancel(self):
1474        # TODO(brian@sweetapp.com): This test is timing dependent.
1475        def notification():
1476            # Wait until the main thread is waiting for the result.
1477            time.sleep(1)
1478            f1.cancel()
1479
1480        f1 = create_future(state=PENDING)
1481        t = threading.Thread(target=notification)
1482        t.start()
1483
1484        self.assertRaises(futures.CancelledError,
1485                          f1.result, timeout=support.SHORT_TIMEOUT)
1486        t.join()
1487
1488    def test_exception_with_timeout(self):
1489        self.assertRaises(futures.TimeoutError,
1490                          PENDING_FUTURE.exception, timeout=0)
1491        self.assertRaises(futures.TimeoutError,
1492                          RUNNING_FUTURE.exception, timeout=0)
1493        self.assertRaises(futures.CancelledError,
1494                          CANCELLED_FUTURE.exception, timeout=0)
1495        self.assertRaises(futures.CancelledError,
1496                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1497        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1498                                   OSError))
1499        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1500
1501    def test_exception_with_success(self):
1502        def notification():
1503            # Wait until the main thread is waiting for the exception.
1504            time.sleep(1)
1505            with f1._condition:
1506                f1._state = FINISHED
1507                f1._exception = OSError()
1508                f1._condition.notify_all()
1509
1510        f1 = create_future(state=PENDING)
1511        t = threading.Thread(target=notification)
1512        t.start()
1513
1514        self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
1515        t.join()
1516
1517    def test_multiple_set_result(self):
1518        f = create_future(state=PENDING)
1519        f.set_result(1)
1520
1521        with self.assertRaisesRegex(
1522                futures.InvalidStateError,
1523                'FINISHED: <Future at 0x[0-9a-f]+ '
1524                'state=finished returned int>'
1525        ):
1526            f.set_result(2)
1527
1528        self.assertTrue(f.done())
1529        self.assertEqual(f.result(), 1)
1530
1531    def test_multiple_set_exception(self):
1532        f = create_future(state=PENDING)
1533        e = ValueError()
1534        f.set_exception(e)
1535
1536        with self.assertRaisesRegex(
1537                futures.InvalidStateError,
1538                'FINISHED: <Future at 0x[0-9a-f]+ '
1539                'state=finished raised ValueError>'
1540        ):
1541            f.set_exception(Exception())
1542
1543        self.assertEqual(f.exception(), e)
1544
1545
1546def setUpModule():
1547    unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
1548    thread_info = threading_helper.threading_setup()
1549    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1550
1551
1552if __name__ == "__main__":
1553    unittest.main()
1554