• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from test import support
2
3# Skip tests if _multiprocessing wasn't built.
4support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6support.skip_if_broken_multiprocessing_synchronize()
7
8from test.support import hashlib_helper
9from test.support.script_helper import assert_python_ok
10
11import contextlib
12import itertools
13import logging
14from logging.handlers import QueueHandler
15import os
16import queue
17import sys
18import threading
19import time
20import unittest
21import weakref
22from pickle import PicklingError
23
24from concurrent import futures
25from concurrent.futures._base import (
26    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
27    BrokenExecutor)
28from concurrent.futures.process import BrokenProcessPool
29from multiprocessing import get_context
30
31import multiprocessing.process
32import multiprocessing.util
33
34
35def create_future(state=PENDING, exception=None, result=None):
36    f = Future()
37    f._state = state
38    f._exception = exception
39    f._result = result
40    return f
41
42
43PENDING_FUTURE = create_future(state=PENDING)
44RUNNING_FUTURE = create_future(state=RUNNING)
45CANCELLED_FUTURE = create_future(state=CANCELLED)
46CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
47EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
48SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
49
50INITIALIZER_STATUS = 'uninitialized'
51
52
53def mul(x, y):
54    return x * y
55
56def capture(*args, **kwargs):
57    return args, kwargs
58
59def sleep_and_raise(t):
60    time.sleep(t)
61    raise Exception('this is an exception')
62
63def sleep_and_print(t, msg):
64    time.sleep(t)
65    print(msg)
66    sys.stdout.flush()
67
68def init(x):
69    global INITIALIZER_STATUS
70    INITIALIZER_STATUS = x
71
72def get_init_status():
73    return INITIALIZER_STATUS
74
75def init_fail(log_queue=None):
76    if log_queue is not None:
77        logger = logging.getLogger('concurrent.futures')
78        logger.addHandler(QueueHandler(log_queue))
79        logger.setLevel('CRITICAL')
80        logger.propagate = False
81    time.sleep(0.1)  # let some futures be scheduled
82    raise ValueError('error in initializer')
83
84
85class MyObject(object):
86    def my_method(self):
87        pass
88
89
90class EventfulGCObj():
91    def __init__(self, mgr):
92        self.event = mgr.Event()
93
94    def __del__(self):
95        self.event.set()
96
97
98def make_dummy_object(_):
99    return MyObject()
100
101
102class BaseTestCase(unittest.TestCase):
103    def setUp(self):
104        self._thread_key = support.threading_setup()
105
106    def tearDown(self):
107        support.reap_children()
108        support.threading_cleanup(*self._thread_key)
109
110
111class ExecutorMixin:
112    worker_count = 5
113    executor_kwargs = {}
114
115    def setUp(self):
116        super().setUp()
117
118        self.t1 = time.monotonic()
119        if hasattr(self, "ctx"):
120            self.executor = self.executor_type(
121                max_workers=self.worker_count,
122                mp_context=self.get_context(),
123                **self.executor_kwargs)
124        else:
125            self.executor = self.executor_type(
126                max_workers=self.worker_count,
127                **self.executor_kwargs)
128        self._prime_executor()
129
130    def tearDown(self):
131        self.executor.shutdown(wait=True)
132        self.executor = None
133
134        dt = time.monotonic() - self.t1
135        if support.verbose:
136            print("%.2fs" % dt, end=' ')
137        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
138
139        super().tearDown()
140
141    def get_context(self):
142        return get_context(self.ctx)
143
144    def _prime_executor(self):
145        # Make sure that the executor is ready to do work before running the
146        # tests. This should reduce the probability of timeouts in the tests.
147        futures = [self.executor.submit(time.sleep, 0.1)
148                   for _ in range(self.worker_count)]
149        for f in futures:
150            f.result()
151
152
153class ThreadPoolMixin(ExecutorMixin):
154    executor_type = futures.ThreadPoolExecutor
155
156
157class ProcessPoolForkMixin(ExecutorMixin):
158    executor_type = futures.ProcessPoolExecutor
159    ctx = "fork"
160
161    def get_context(self):
162        if sys.platform == "win32":
163            self.skipTest("require unix system")
164        return super().get_context()
165
166
167class ProcessPoolSpawnMixin(ExecutorMixin):
168    executor_type = futures.ProcessPoolExecutor
169    ctx = "spawn"
170
171
172class ProcessPoolForkserverMixin(ExecutorMixin):
173    executor_type = futures.ProcessPoolExecutor
174    ctx = "forkserver"
175
176    def get_context(self):
177        if sys.platform == "win32":
178            self.skipTest("require unix system")
179        return super().get_context()
180
181
182def create_executor_tests(mixin, bases=(BaseTestCase,),
183                          executor_mixins=(ThreadPoolMixin,
184                                           ProcessPoolForkMixin,
185                                           ProcessPoolForkserverMixin,
186                                           ProcessPoolSpawnMixin)):
187    def strip_mixin(name):
188        if name.endswith(('Mixin', 'Tests')):
189            return name[:-5]
190        elif name.endswith('Test'):
191            return name[:-4]
192        else:
193            return name
194
195    for exe in executor_mixins:
196        name = ("%s%sTest"
197                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
198        cls = type(name, (mixin,) + (exe,) + bases, {})
199        globals()[name] = cls
200
201
202class InitializerMixin(ExecutorMixin):
203    worker_count = 2
204
205    def setUp(self):
206        global INITIALIZER_STATUS
207        INITIALIZER_STATUS = 'uninitialized'
208        self.executor_kwargs = dict(initializer=init,
209                                    initargs=('initialized',))
210        super().setUp()
211
212    def test_initializer(self):
213        futures = [self.executor.submit(get_init_status)
214                   for _ in range(self.worker_count)]
215
216        for f in futures:
217            self.assertEqual(f.result(), 'initialized')
218
219
220class FailingInitializerMixin(ExecutorMixin):
221    worker_count = 2
222
223    def setUp(self):
224        if hasattr(self, "ctx"):
225            # Pass a queue to redirect the child's logging output
226            self.mp_context = self.get_context()
227            self.log_queue = self.mp_context.Queue()
228            self.executor_kwargs = dict(initializer=init_fail,
229                                        initargs=(self.log_queue,))
230        else:
231            # In a thread pool, the child shares our logging setup
232            # (see _assert_logged())
233            self.mp_context = None
234            self.log_queue = None
235            self.executor_kwargs = dict(initializer=init_fail)
236        super().setUp()
237
238    def test_initializer(self):
239        with self._assert_logged('ValueError: error in initializer'):
240            try:
241                future = self.executor.submit(get_init_status)
242            except BrokenExecutor:
243                # Perhaps the executor is already broken
244                pass
245            else:
246                with self.assertRaises(BrokenExecutor):
247                    future.result()
248            # At some point, the executor should break
249            t1 = time.monotonic()
250            while not self.executor._broken:
251                if time.monotonic() - t1 > 5:
252                    self.fail("executor not broken after 5 s.")
253                time.sleep(0.01)
254            # ... and from this point submit() is guaranteed to fail
255            with self.assertRaises(BrokenExecutor):
256                self.executor.submit(get_init_status)
257
258    def _prime_executor(self):
259        pass
260
261    @contextlib.contextmanager
262    def _assert_logged(self, msg):
263        if self.log_queue is not None:
264            yield
265            output = []
266            try:
267                while True:
268                    output.append(self.log_queue.get_nowait().getMessage())
269            except queue.Empty:
270                pass
271        else:
272            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
273                yield
274            output = cm.output
275        self.assertTrue(any(msg in line for line in output),
276                        output)
277
278
279create_executor_tests(InitializerMixin)
280create_executor_tests(FailingInitializerMixin)
281
282
283class ExecutorShutdownTest:
284    def test_run_after_shutdown(self):
285        self.executor.shutdown()
286        self.assertRaises(RuntimeError,
287                          self.executor.submit,
288                          pow, 2, 5)
289
290    def test_interpreter_shutdown(self):
291        # Test the atexit hook for shutdown of worker threads and processes
292        rc, out, err = assert_python_ok('-c', """if 1:
293            from concurrent.futures import {executor_type}
294            from time import sleep
295            from test.test_concurrent_futures import sleep_and_print
296            if __name__ == "__main__":
297                context = '{context}'
298                if context == "":
299                    t = {executor_type}(5)
300                else:
301                    from multiprocessing import get_context
302                    context = get_context(context)
303                    t = {executor_type}(5, mp_context=context)
304                t.submit(sleep_and_print, 1.0, "apple")
305            """.format(executor_type=self.executor_type.__name__,
306                       context=getattr(self, "ctx", "")))
307        # Errors in atexit hooks don't change the process exit code, check
308        # stderr manually.
309        self.assertFalse(err)
310        self.assertEqual(out.strip(), b"apple")
311
312    def test_submit_after_interpreter_shutdown(self):
313        # Test the atexit hook for shutdown of worker threads and processes
314        rc, out, err = assert_python_ok('-c', """if 1:
315            import atexit
316            @atexit.register
317            def run_last():
318                try:
319                    t.submit(id, None)
320                except RuntimeError:
321                    print("runtime-error")
322                    raise
323            from concurrent.futures import {executor_type}
324            if __name__ == "__main__":
325                context = '{context}'
326                if not context:
327                    t = {executor_type}(5)
328                else:
329                    from multiprocessing import get_context
330                    context = get_context(context)
331                    t = {executor_type}(5, mp_context=context)
332                    t.submit(id, 42).result()
333            """.format(executor_type=self.executor_type.__name__,
334                       context=getattr(self, "ctx", "")))
335        # Errors in atexit hooks don't change the process exit code, check
336        # stderr manually.
337        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
338        self.assertEqual(out.strip(), b"runtime-error")
339
340    def test_hang_issue12364(self):
341        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
342        self.executor.shutdown()
343        for f in fs:
344            f.result()
345
346    def test_cancel_futures(self):
347        executor = self.executor_type(max_workers=3)
348        fs = [executor.submit(time.sleep, .1) for _ in range(50)]
349        executor.shutdown(cancel_futures=True)
350        # We can't guarantee the exact number of cancellations, but we can
351        # guarantee that *some* were cancelled. With setting max_workers to 3,
352        # most of the submitted futures should have been cancelled.
353        cancelled = [fut for fut in fs if fut.cancelled()]
354        self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
355
356        # Ensure the other futures were able to finish.
357        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
358        # that may have been left in a pending state.
359        others = [fut for fut in fs if not fut.cancelled()]
360        for fut in others:
361            self.assertTrue(fut.done(), msg=f"{fut._state=}")
362            self.assertIsNone(fut.exception())
363
364        # Similar to the number of cancelled futures, we can't guarantee the
365        # exact number that completed. But, we can guarantee that at least
366        # one finished.
367        self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
368
369    def test_hang_issue39205(self):
370        """shutdown(wait=False) doesn't hang at exit with running futures.
371
372        See https://bugs.python.org/issue39205.
373        """
374        if self.executor_type == futures.ProcessPoolExecutor:
375            raise unittest.SkipTest(
376                "Hangs due to https://bugs.python.org/issue39205")
377
378        rc, out, err = assert_python_ok('-c', """if True:
379            from concurrent.futures import {executor_type}
380            from test.test_concurrent_futures import sleep_and_print
381            if __name__ == "__main__":
382                t = {executor_type}(max_workers=3)
383                t.submit(sleep_and_print, 1.0, "apple")
384                t.shutdown(wait=False)
385            """.format(executor_type=self.executor_type.__name__))
386        self.assertFalse(err)
387        self.assertEqual(out.strip(), b"apple")
388
389
390class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
391    def _prime_executor(self):
392        pass
393
394    def test_threads_terminate(self):
395        def acquire_lock(lock):
396            lock.acquire()
397
398        sem = threading.Semaphore(0)
399        for i in range(3):
400            self.executor.submit(acquire_lock, sem)
401        self.assertEqual(len(self.executor._threads), 3)
402        for i in range(3):
403            sem.release()
404        self.executor.shutdown()
405        for t in self.executor._threads:
406            t.join()
407
408    def test_context_manager_shutdown(self):
409        with futures.ThreadPoolExecutor(max_workers=5) as e:
410            executor = e
411            self.assertEqual(list(e.map(abs, range(-5, 5))),
412                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
413
414        for t in executor._threads:
415            t.join()
416
417    def test_del_shutdown(self):
418        executor = futures.ThreadPoolExecutor(max_workers=5)
419        res = executor.map(abs, range(-5, 5))
420        threads = executor._threads
421        del executor
422
423        for t in threads:
424            t.join()
425
426        # Make sure the results were all computed before the
427        # executor got shutdown.
428        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
429
430    def test_shutdown_no_wait(self):
431        # Ensure that the executor cleans up the threads when calling
432        # shutdown with wait=False
433        executor = futures.ThreadPoolExecutor(max_workers=5)
434        res = executor.map(abs, range(-5, 5))
435        threads = executor._threads
436        executor.shutdown(wait=False)
437        for t in threads:
438            t.join()
439
440        # Make sure the results were all computed before the
441        # executor got shutdown.
442        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
443
444
445    def test_thread_names_assigned(self):
446        executor = futures.ThreadPoolExecutor(
447            max_workers=5, thread_name_prefix='SpecialPool')
448        executor.map(abs, range(-5, 5))
449        threads = executor._threads
450        del executor
451
452        for t in threads:
453            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
454            t.join()
455
456    def test_thread_names_default(self):
457        executor = futures.ThreadPoolExecutor(max_workers=5)
458        executor.map(abs, range(-5, 5))
459        threads = executor._threads
460        del executor
461
462        for t in threads:
463            # Ensure that our default name is reasonably sane and unique when
464            # no thread_name_prefix was supplied.
465            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
466            t.join()
467
468    def test_cancel_futures_wait_false(self):
469        # Can only be reliably tested for TPE, since PPE often hangs with
470        # `wait=False` (even without *cancel_futures*).
471        rc, out, err = assert_python_ok('-c', """if True:
472            from concurrent.futures import ThreadPoolExecutor
473            from test.test_concurrent_futures import sleep_and_print
474            if __name__ == "__main__":
475                t = ThreadPoolExecutor()
476                t.submit(sleep_and_print, .1, "apple")
477                t.shutdown(wait=False, cancel_futures=True)
478            """.format(executor_type=self.executor_type.__name__))
479        # Errors in atexit hooks don't change the process exit code, check
480        # stderr manually.
481        self.assertFalse(err)
482        self.assertEqual(out.strip(), b"apple")
483
484
485class ProcessPoolShutdownTest(ExecutorShutdownTest):
486    def _prime_executor(self):
487        pass
488
489    def test_processes_terminate(self):
490        def acquire_lock(lock):
491            lock.acquire()
492
493        mp_context = get_context()
494        sem = mp_context.Semaphore(0)
495        for _ in range(3):
496            self.executor.submit(acquire_lock, sem)
497        self.assertEqual(len(self.executor._processes), 3)
498        for _ in range(3):
499            sem.release()
500        processes = self.executor._processes
501        self.executor.shutdown()
502
503        for p in processes.values():
504            p.join()
505
506    def test_context_manager_shutdown(self):
507        with futures.ProcessPoolExecutor(max_workers=5) as e:
508            processes = e._processes
509            self.assertEqual(list(e.map(abs, range(-5, 5))),
510                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
511
512        for p in processes.values():
513            p.join()
514
515    def test_del_shutdown(self):
516        executor = futures.ProcessPoolExecutor(max_workers=5)
517        res = executor.map(abs, range(-5, 5))
518        executor_manager_thread = executor._executor_manager_thread
519        processes = executor._processes
520        call_queue = executor._call_queue
521        executor_manager_thread = executor._executor_manager_thread
522        del executor
523
524        # Make sure that all the executor resources were properly cleaned by
525        # the shutdown process
526        executor_manager_thread.join()
527        for p in processes.values():
528            p.join()
529        call_queue.join_thread()
530
531        # Make sure the results were all computed before the
532        # executor got shutdown.
533        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
534
535    def test_shutdown_no_wait(self):
536        # Ensure that the executor cleans up the processes when calling
537        # shutdown with wait=False
538        executor = futures.ProcessPoolExecutor(max_workers=5)
539        res = executor.map(abs, range(-5, 5))
540        processes = executor._processes
541        call_queue = executor._call_queue
542        executor_manager_thread = executor._executor_manager_thread
543        executor.shutdown(wait=False)
544
545        # Make sure that all the executor resources were properly cleaned by
546        # the shutdown process
547        executor_manager_thread.join()
548        for p in processes.values():
549            p.join()
550        call_queue.join_thread()
551
552        # Make sure the results were all computed before the executor got
553        # shutdown.
554        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
555
556
557create_executor_tests(ProcessPoolShutdownTest,
558                      executor_mixins=(ProcessPoolForkMixin,
559                                       ProcessPoolForkserverMixin,
560                                       ProcessPoolSpawnMixin))
561
562
563class WaitTests:
564
565    def test_first_completed(self):
566        future1 = self.executor.submit(mul, 21, 2)
567        future2 = self.executor.submit(time.sleep, 1.5)
568
569        done, not_done = futures.wait(
570                [CANCELLED_FUTURE, future1, future2],
571                 return_when=futures.FIRST_COMPLETED)
572
573        self.assertEqual(set([future1]), done)
574        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
575
576    def test_first_completed_some_already_completed(self):
577        future1 = self.executor.submit(time.sleep, 1.5)
578
579        finished, pending = futures.wait(
580                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
581                 return_when=futures.FIRST_COMPLETED)
582
583        self.assertEqual(
584                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
585                finished)
586        self.assertEqual(set([future1]), pending)
587
588    def test_first_exception(self):
589        future1 = self.executor.submit(mul, 2, 21)
590        future2 = self.executor.submit(sleep_and_raise, 1.5)
591        future3 = self.executor.submit(time.sleep, 3)
592
593        finished, pending = futures.wait(
594                [future1, future2, future3],
595                return_when=futures.FIRST_EXCEPTION)
596
597        self.assertEqual(set([future1, future2]), finished)
598        self.assertEqual(set([future3]), pending)
599
600    def test_first_exception_some_already_complete(self):
601        future1 = self.executor.submit(divmod, 21, 0)
602        future2 = self.executor.submit(time.sleep, 1.5)
603
604        finished, pending = futures.wait(
605                [SUCCESSFUL_FUTURE,
606                 CANCELLED_FUTURE,
607                 CANCELLED_AND_NOTIFIED_FUTURE,
608                 future1, future2],
609                return_when=futures.FIRST_EXCEPTION)
610
611        self.assertEqual(set([SUCCESSFUL_FUTURE,
612                              CANCELLED_AND_NOTIFIED_FUTURE,
613                              future1]), finished)
614        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
615
616    def test_first_exception_one_already_failed(self):
617        future1 = self.executor.submit(time.sleep, 2)
618
619        finished, pending = futures.wait(
620                 [EXCEPTION_FUTURE, future1],
621                 return_when=futures.FIRST_EXCEPTION)
622
623        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
624        self.assertEqual(set([future1]), pending)
625
626    def test_all_completed(self):
627        future1 = self.executor.submit(divmod, 2, 0)
628        future2 = self.executor.submit(mul, 2, 21)
629
630        finished, pending = futures.wait(
631                [SUCCESSFUL_FUTURE,
632                 CANCELLED_AND_NOTIFIED_FUTURE,
633                 EXCEPTION_FUTURE,
634                 future1,
635                 future2],
636                return_when=futures.ALL_COMPLETED)
637
638        self.assertEqual(set([SUCCESSFUL_FUTURE,
639                              CANCELLED_AND_NOTIFIED_FUTURE,
640                              EXCEPTION_FUTURE,
641                              future1,
642                              future2]), finished)
643        self.assertEqual(set(), pending)
644
645    def test_timeout(self):
646        future1 = self.executor.submit(mul, 6, 7)
647        future2 = self.executor.submit(time.sleep, 6)
648
649        finished, pending = futures.wait(
650                [CANCELLED_AND_NOTIFIED_FUTURE,
651                 EXCEPTION_FUTURE,
652                 SUCCESSFUL_FUTURE,
653                 future1, future2],
654                timeout=5,
655                return_when=futures.ALL_COMPLETED)
656
657        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
658                              EXCEPTION_FUTURE,
659                              SUCCESSFUL_FUTURE,
660                              future1]), finished)
661        self.assertEqual(set([future2]), pending)
662
663
664class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
665
666    def test_pending_calls_race(self):
667        # Issue #14406: multi-threaded race condition when waiting on all
668        # futures.
669        event = threading.Event()
670        def future_func():
671            event.wait()
672        oldswitchinterval = sys.getswitchinterval()
673        sys.setswitchinterval(1e-6)
674        try:
675            fs = {self.executor.submit(future_func) for i in range(100)}
676            event.set()
677            futures.wait(fs, return_when=futures.ALL_COMPLETED)
678        finally:
679            sys.setswitchinterval(oldswitchinterval)
680
681
682create_executor_tests(WaitTests,
683                      executor_mixins=(ProcessPoolForkMixin,
684                                       ProcessPoolForkserverMixin,
685                                       ProcessPoolSpawnMixin))
686
687
688class AsCompletedTests:
689    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
690    def test_no_timeout(self):
691        future1 = self.executor.submit(mul, 2, 21)
692        future2 = self.executor.submit(mul, 7, 6)
693
694        completed = set(futures.as_completed(
695                [CANCELLED_AND_NOTIFIED_FUTURE,
696                 EXCEPTION_FUTURE,
697                 SUCCESSFUL_FUTURE,
698                 future1, future2]))
699        self.assertEqual(set(
700                [CANCELLED_AND_NOTIFIED_FUTURE,
701                 EXCEPTION_FUTURE,
702                 SUCCESSFUL_FUTURE,
703                 future1, future2]),
704                completed)
705
706    def test_zero_timeout(self):
707        future1 = self.executor.submit(time.sleep, 2)
708        completed_futures = set()
709        try:
710            for future in futures.as_completed(
711                    [CANCELLED_AND_NOTIFIED_FUTURE,
712                     EXCEPTION_FUTURE,
713                     SUCCESSFUL_FUTURE,
714                     future1],
715                    timeout=0):
716                completed_futures.add(future)
717        except futures.TimeoutError:
718            pass
719
720        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
721                              EXCEPTION_FUTURE,
722                              SUCCESSFUL_FUTURE]),
723                         completed_futures)
724
725    def test_duplicate_futures(self):
726        # Issue 20367. Duplicate futures should not raise exceptions or give
727        # duplicate responses.
728        # Issue #31641: accept arbitrary iterables.
729        future1 = self.executor.submit(time.sleep, 2)
730        completed = [
731            f for f in futures.as_completed(itertools.repeat(future1, 3))
732        ]
733        self.assertEqual(len(completed), 1)
734
735    def test_free_reference_yielded_future(self):
736        # Issue #14406: Generator should not keep references
737        # to finished futures.
738        futures_list = [Future() for _ in range(8)]
739        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
740        futures_list.append(create_future(state=FINISHED, result=42))
741
742        with self.assertRaises(futures.TimeoutError):
743            for future in futures.as_completed(futures_list, timeout=0):
744                futures_list.remove(future)
745                wr = weakref.ref(future)
746                del future
747                self.assertIsNone(wr())
748
749        futures_list[0].set_result("test")
750        for future in futures.as_completed(futures_list):
751            futures_list.remove(future)
752            wr = weakref.ref(future)
753            del future
754            self.assertIsNone(wr())
755            if futures_list:
756                futures_list[0].set_result("test")
757
758    def test_correct_timeout_exception_msg(self):
759        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
760                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
761
762        with self.assertRaises(futures.TimeoutError) as cm:
763            list(futures.as_completed(futures_list, timeout=0))
764
765        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
766
767
768create_executor_tests(AsCompletedTests)
769
770
771class ExecutorTest:
772    # Executor.shutdown() and context manager usage is tested by
773    # ExecutorShutdownTest.
774    def test_submit(self):
775        future = self.executor.submit(pow, 2, 8)
776        self.assertEqual(256, future.result())
777
778    def test_submit_keyword(self):
779        future = self.executor.submit(mul, 2, y=8)
780        self.assertEqual(16, future.result())
781        future = self.executor.submit(capture, 1, self=2, fn=3)
782        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
783        with self.assertRaises(TypeError):
784            self.executor.submit(fn=capture, arg=1)
785        with self.assertRaises(TypeError):
786            self.executor.submit(arg=1)
787
788    def test_map(self):
789        self.assertEqual(
790                list(self.executor.map(pow, range(10), range(10))),
791                list(map(pow, range(10), range(10))))
792
793        self.assertEqual(
794                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
795                list(map(pow, range(10), range(10))))
796
797    def test_map_exception(self):
798        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
799        self.assertEqual(i.__next__(), (0, 1))
800        self.assertEqual(i.__next__(), (0, 1))
801        self.assertRaises(ZeroDivisionError, i.__next__)
802
803    def test_map_timeout(self):
804        results = []
805        try:
806            for i in self.executor.map(time.sleep,
807                                       [0, 0, 6],
808                                       timeout=5):
809                results.append(i)
810        except futures.TimeoutError:
811            pass
812        else:
813            self.fail('expected TimeoutError')
814
815        self.assertEqual([None, None], results)
816
817    def test_shutdown_race_issue12456(self):
818        # Issue #12456: race condition at shutdown where trying to post a
819        # sentinel in the call queue blocks (the queue is full while processes
820        # have exited).
821        self.executor.map(str, [2] * (self.worker_count + 1))
822        self.executor.shutdown()
823
824    @support.cpython_only
825    def test_no_stale_references(self):
826        # Issue #16284: check that the executors don't unnecessarily hang onto
827        # references.
828        my_object = MyObject()
829        my_object_collected = threading.Event()
830        my_object_callback = weakref.ref(
831            my_object, lambda obj: my_object_collected.set())
832        # Deliberately discarding the future.
833        self.executor.submit(my_object.my_method)
834        del my_object
835
836        collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
837        self.assertTrue(collected,
838                        "Stale reference not collected within timeout.")
839
840    def test_max_workers_negative(self):
841        for number in (0, -1):
842            with self.assertRaisesRegex(ValueError,
843                                        "max_workers must be greater "
844                                        "than 0"):
845                self.executor_type(max_workers=number)
846
847    def test_free_reference(self):
848        # Issue #14406: Result iterator should not keep an internal
849        # reference to result objects.
850        for obj in self.executor.map(make_dummy_object, range(10)):
851            wr = weakref.ref(obj)
852            del obj
853            self.assertIsNone(wr())
854
855
856class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
857    def test_map_submits_without_iteration(self):
858        """Tests verifying issue 11777."""
859        finished = []
860        def record_finished(n):
861            finished.append(n)
862
863        self.executor.map(record_finished, range(10))
864        self.executor.shutdown(wait=True)
865        self.assertCountEqual(finished, range(10))
866
867    def test_default_workers(self):
868        executor = self.executor_type()
869        expected = min(32, (os.cpu_count() or 1) + 4)
870        self.assertEqual(executor._max_workers, expected)
871
872    def test_saturation(self):
873        executor = self.executor_type(4)
874        def acquire_lock(lock):
875            lock.acquire()
876
877        sem = threading.Semaphore(0)
878        for i in range(15 * executor._max_workers):
879            executor.submit(acquire_lock, sem)
880        self.assertEqual(len(executor._threads), executor._max_workers)
881        for i in range(15 * executor._max_workers):
882            sem.release()
883        executor.shutdown(wait=True)
884
885    def test_idle_thread_reuse(self):
886        executor = self.executor_type()
887        executor.submit(mul, 21, 2).result()
888        executor.submit(mul, 6, 7).result()
889        executor.submit(mul, 3, 14).result()
890        self.assertEqual(len(executor._threads), 1)
891        executor.shutdown(wait=True)
892
893
894class ProcessPoolExecutorTest(ExecutorTest):
895
896    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
897    def test_max_workers_too_large(self):
898        with self.assertRaisesRegex(ValueError,
899                                    "max_workers must be <= 61"):
900            futures.ProcessPoolExecutor(max_workers=62)
901
902    def test_killed_child(self):
903        # When a child process is abruptly terminated, the whole pool gets
904        # "broken".
905        futures = [self.executor.submit(time.sleep, 3)]
906        # Get one of the processes, and terminate (kill) it
907        p = next(iter(self.executor._processes.values()))
908        p.terminate()
909        for fut in futures:
910            self.assertRaises(BrokenProcessPool, fut.result)
911        # Submitting other jobs fails as well.
912        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
913
914    def test_map_chunksize(self):
915        def bad_map():
916            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
917
918        ref = list(map(pow, range(40), range(40)))
919        self.assertEqual(
920            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
921            ref)
922        self.assertEqual(
923            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
924            ref)
925        self.assertEqual(
926            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
927            ref)
928        self.assertRaises(ValueError, bad_map)
929
930    @classmethod
931    def _test_traceback(cls):
932        raise RuntimeError(123) # some comment
933
934    def test_traceback(self):
935        # We want ensure that the traceback from the child process is
936        # contained in the traceback raised in the main process.
937        future = self.executor.submit(self._test_traceback)
938        with self.assertRaises(Exception) as cm:
939            future.result()
940
941        exc = cm.exception
942        self.assertIs(type(exc), RuntimeError)
943        self.assertEqual(exc.args, (123,))
944        cause = exc.__cause__
945        self.assertIs(type(cause), futures.process._RemoteTraceback)
946        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
947
948        with support.captured_stderr() as f1:
949            try:
950                raise exc
951            except RuntimeError:
952                sys.excepthook(*sys.exc_info())
953        self.assertIn('raise RuntimeError(123) # some comment',
954                      f1.getvalue())
955
956    @hashlib_helper.requires_hashdigest('md5')
957    def test_ressources_gced_in_workers(self):
958        # Ensure that argument for a job are correctly gc-ed after the job
959        # is finished
960        mgr = get_context(self.ctx).Manager()
961        obj = EventfulGCObj(mgr)
962        future = self.executor.submit(id, obj)
963        future.result()
964
965        self.assertTrue(obj.event.wait(timeout=1))
966
967        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
968        # is called while manager is still running.
969        obj = None
970        support.gc_collect()
971
972        mgr.shutdown()
973        mgr.join()
974
975    def test_saturation(self):
976        executor = self.executor_type(4)
977        mp_context = get_context()
978        sem = mp_context.Semaphore(0)
979        job_count = 15 * executor._max_workers
980        try:
981            for _ in range(job_count):
982                executor.submit(sem.acquire)
983            self.assertEqual(len(executor._processes), executor._max_workers)
984            for _ in range(job_count):
985                sem.release()
986        finally:
987            executor.shutdown()
988
989    def test_idle_process_reuse_one(self):
990        executor = self.executor_type(4)
991        executor.submit(mul, 21, 2).result()
992        executor.submit(mul, 6, 7).result()
993        executor.submit(mul, 3, 14).result()
994        self.assertEqual(len(executor._processes), 1)
995        executor.shutdown()
996
997    def test_idle_process_reuse_multiple(self):
998        executor = self.executor_type(4)
999        executor.submit(mul, 12, 7).result()
1000        executor.submit(mul, 33, 25)
1001        executor.submit(mul, 25, 26).result()
1002        executor.submit(mul, 18, 29)
1003        self.assertLessEqual(len(executor._processes), 2)
1004        executor.shutdown()
1005
1006create_executor_tests(ProcessPoolExecutorTest,
1007                      executor_mixins=(ProcessPoolForkMixin,
1008                                       ProcessPoolForkserverMixin,
1009                                       ProcessPoolSpawnMixin))
1010
1011def _crash(delay=None):
1012    """Induces a segfault."""
1013    if delay:
1014        time.sleep(delay)
1015    import faulthandler
1016    faulthandler.disable()
1017    faulthandler._sigsegv()
1018
1019
1020def _exit():
1021    """Induces a sys exit with exitcode 1."""
1022    sys.exit(1)
1023
1024
1025def _raise_error(Err):
1026    """Function that raises an Exception in process."""
1027    raise Err()
1028
1029
1030def _raise_error_ignore_stderr(Err):
1031    """Function that raises an Exception in process and ignores stderr."""
1032    import io
1033    sys.stderr = io.StringIO()
1034    raise Err()
1035
1036
1037def _return_instance(cls):
1038    """Function that returns a instance of cls."""
1039    return cls()
1040
1041
1042class CrashAtPickle(object):
1043    """Bad object that triggers a segfault at pickling time."""
1044    def __reduce__(self):
1045        _crash()
1046
1047
1048class CrashAtUnpickle(object):
1049    """Bad object that triggers a segfault at unpickling time."""
1050    def __reduce__(self):
1051        return _crash, ()
1052
1053
1054class ExitAtPickle(object):
1055    """Bad object that triggers a process exit at pickling time."""
1056    def __reduce__(self):
1057        _exit()
1058
1059
1060class ExitAtUnpickle(object):
1061    """Bad object that triggers a process exit at unpickling time."""
1062    def __reduce__(self):
1063        return _exit, ()
1064
1065
1066class ErrorAtPickle(object):
1067    """Bad object that triggers an error at pickling time."""
1068    def __reduce__(self):
1069        from pickle import PicklingError
1070        raise PicklingError("Error in pickle")
1071
1072
1073class ErrorAtUnpickle(object):
1074    """Bad object that triggers an error at unpickling time."""
1075    def __reduce__(self):
1076        from pickle import UnpicklingError
1077        return _raise_error_ignore_stderr, (UnpicklingError, )
1078
1079
1080class ExecutorDeadlockTest:
1081    TIMEOUT = support.SHORT_TIMEOUT
1082
1083    def _fail_on_deadlock(self, executor):
1084        # If we did not recover before TIMEOUT seconds, consider that the
1085        # executor is in a deadlock state and forcefully clean all its
1086        # composants.
1087        import faulthandler
1088        from tempfile import TemporaryFile
1089        with TemporaryFile(mode="w+") as f:
1090            faulthandler.dump_traceback(file=f)
1091            f.seek(0)
1092            tb = f.read()
1093        for p in executor._processes.values():
1094            p.terminate()
1095        # This should be safe to call executor.shutdown here as all possible
1096        # deadlocks should have been broken.
1097        executor.shutdown(wait=True)
1098        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
1099        self.fail(f"Executor deadlock:\n\n{tb}")
1100
1101
1102    def _check_crash(self, error, func, *args, ignore_stderr=False):
1103        # test for deadlock caused by crashes in a pool
1104        self.executor.shutdown(wait=True)
1105
1106        executor = self.executor_type(
1107            max_workers=2, mp_context=get_context(self.ctx))
1108        res = executor.submit(func, *args)
1109
1110        if ignore_stderr:
1111            cm = support.captured_stderr()
1112        else:
1113            cm = contextlib.nullcontext()
1114
1115        try:
1116            with self.assertRaises(error):
1117                with cm:
1118                    res.result(timeout=self.TIMEOUT)
1119        except futures.TimeoutError:
1120            # If we did not recover before TIMEOUT seconds,
1121            # consider that the executor is in a deadlock state
1122            self._fail_on_deadlock(executor)
1123        executor.shutdown(wait=True)
1124
1125    def test_error_at_task_pickle(self):
1126        # Check problem occurring while pickling a task in
1127        # the task_handler thread
1128        self._check_crash(PicklingError, id, ErrorAtPickle())
1129
1130    def test_exit_at_task_unpickle(self):
1131        # Check problem occurring while unpickling a task on workers
1132        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
1133
1134    def test_error_at_task_unpickle(self):
1135        # Check problem occurring while unpickling a task on workers
1136        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
1137
1138    def test_crash_at_task_unpickle(self):
1139        # Check problem occurring while unpickling a task on workers
1140        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
1141
1142    def test_crash_during_func_exec_on_worker(self):
1143        # Check problem occurring during func execution on workers
1144        self._check_crash(BrokenProcessPool, _crash)
1145
1146    def test_exit_during_func_exec_on_worker(self):
1147        # Check problem occurring during func execution on workers
1148        self._check_crash(SystemExit, _exit)
1149
1150    def test_error_during_func_exec_on_worker(self):
1151        # Check problem occurring during func execution on workers
1152        self._check_crash(RuntimeError, _raise_error, RuntimeError)
1153
1154    def test_crash_during_result_pickle_on_worker(self):
1155        # Check problem occurring while pickling a task result
1156        # on workers
1157        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
1158
1159    def test_exit_during_result_pickle_on_worker(self):
1160        # Check problem occurring while pickling a task result
1161        # on workers
1162        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
1163
1164    def test_error_during_result_pickle_on_worker(self):
1165        # Check problem occurring while pickling a task result
1166        # on workers
1167        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
1168
1169    def test_error_during_result_unpickle_in_result_handler(self):
1170        # Check problem occurring while unpickling a task in
1171        # the result_handler thread
1172        self._check_crash(BrokenProcessPool,
1173                          _return_instance, ErrorAtUnpickle,
1174                          ignore_stderr=True)
1175
1176    def test_exit_during_result_unpickle_in_result_handler(self):
1177        # Check problem occurring while unpickling a task in
1178        # the result_handler thread
1179        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
1180
1181    def test_shutdown_deadlock(self):
1182        # Test that the pool calling shutdown do not cause deadlock
1183        # if a worker fails after the shutdown call.
1184        self.executor.shutdown(wait=True)
1185        with self.executor_type(max_workers=2,
1186                                mp_context=get_context(self.ctx)) as executor:
1187            self.executor = executor  # Allow clean up in fail_on_deadlock
1188            f = executor.submit(_crash, delay=.1)
1189            executor.shutdown(wait=True)
1190            with self.assertRaises(BrokenProcessPool):
1191                f.result()
1192
1193    def test_shutdown_deadlock_pickle(self):
1194        # Test that the pool calling shutdown with wait=False does not cause
1195        # a deadlock if a task fails at pickle after the shutdown call.
1196        # Reported in bpo-39104.
1197        self.executor.shutdown(wait=True)
1198        with self.executor_type(max_workers=2,
1199                                mp_context=get_context(self.ctx)) as executor:
1200            self.executor = executor  # Allow clean up in fail_on_deadlock
1201
1202            # Start the executor and get the executor_manager_thread to collect
1203            # the threads and avoid dangling thread that should be cleaned up
1204            # asynchronously.
1205            executor.submit(id, 42).result()
1206            executor_manager = executor._executor_manager_thread
1207
1208            # Submit a task that fails at pickle and shutdown the executor
1209            # without waiting
1210            f = executor.submit(id, ErrorAtPickle())
1211            executor.shutdown(wait=False)
1212            with self.assertRaises(PicklingError):
1213                f.result()
1214
1215        # Make sure the executor is eventually shutdown and do not leave
1216        # dangling threads
1217        executor_manager.join()
1218
1219
1220create_executor_tests(ExecutorDeadlockTest,
1221                      executor_mixins=(ProcessPoolForkMixin,
1222                                       ProcessPoolForkserverMixin,
1223                                       ProcessPoolSpawnMixin))
1224
1225
1226class FutureTests(BaseTestCase):
1227    def test_done_callback_with_result(self):
1228        callback_result = None
1229        def fn(callback_future):
1230            nonlocal callback_result
1231            callback_result = callback_future.result()
1232
1233        f = Future()
1234        f.add_done_callback(fn)
1235        f.set_result(5)
1236        self.assertEqual(5, callback_result)
1237
1238    def test_done_callback_with_exception(self):
1239        callback_exception = None
1240        def fn(callback_future):
1241            nonlocal callback_exception
1242            callback_exception = callback_future.exception()
1243
1244        f = Future()
1245        f.add_done_callback(fn)
1246        f.set_exception(Exception('test'))
1247        self.assertEqual(('test',), callback_exception.args)
1248
1249    def test_done_callback_with_cancel(self):
1250        was_cancelled = None
1251        def fn(callback_future):
1252            nonlocal was_cancelled
1253            was_cancelled = callback_future.cancelled()
1254
1255        f = Future()
1256        f.add_done_callback(fn)
1257        self.assertTrue(f.cancel())
1258        self.assertTrue(was_cancelled)
1259
1260    def test_done_callback_raises(self):
1261        with support.captured_stderr() as stderr:
1262            raising_was_called = False
1263            fn_was_called = False
1264
1265            def raising_fn(callback_future):
1266                nonlocal raising_was_called
1267                raising_was_called = True
1268                raise Exception('doh!')
1269
1270            def fn(callback_future):
1271                nonlocal fn_was_called
1272                fn_was_called = True
1273
1274            f = Future()
1275            f.add_done_callback(raising_fn)
1276            f.add_done_callback(fn)
1277            f.set_result(5)
1278            self.assertTrue(raising_was_called)
1279            self.assertTrue(fn_was_called)
1280            self.assertIn('Exception: doh!', stderr.getvalue())
1281
1282    def test_done_callback_already_successful(self):
1283        callback_result = None
1284        def fn(callback_future):
1285            nonlocal callback_result
1286            callback_result = callback_future.result()
1287
1288        f = Future()
1289        f.set_result(5)
1290        f.add_done_callback(fn)
1291        self.assertEqual(5, callback_result)
1292
1293    def test_done_callback_already_failed(self):
1294        callback_exception = None
1295        def fn(callback_future):
1296            nonlocal callback_exception
1297            callback_exception = callback_future.exception()
1298
1299        f = Future()
1300        f.set_exception(Exception('test'))
1301        f.add_done_callback(fn)
1302        self.assertEqual(('test',), callback_exception.args)
1303
1304    def test_done_callback_already_cancelled(self):
1305        was_cancelled = None
1306        def fn(callback_future):
1307            nonlocal was_cancelled
1308            was_cancelled = callback_future.cancelled()
1309
1310        f = Future()
1311        self.assertTrue(f.cancel())
1312        f.add_done_callback(fn)
1313        self.assertTrue(was_cancelled)
1314
1315    def test_done_callback_raises_already_succeeded(self):
1316        with support.captured_stderr() as stderr:
1317            def raising_fn(callback_future):
1318                raise Exception('doh!')
1319
1320            f = Future()
1321
1322            # Set the result first to simulate a future that runs instantly,
1323            # effectively allowing the callback to be run immediately.
1324            f.set_result(5)
1325            f.add_done_callback(raising_fn)
1326
1327            self.assertIn('exception calling callback for', stderr.getvalue())
1328            self.assertIn('doh!', stderr.getvalue())
1329
1330
1331    def test_repr(self):
1332        self.assertRegex(repr(PENDING_FUTURE),
1333                         '<Future at 0x[0-9a-f]+ state=pending>')
1334        self.assertRegex(repr(RUNNING_FUTURE),
1335                         '<Future at 0x[0-9a-f]+ state=running>')
1336        self.assertRegex(repr(CANCELLED_FUTURE),
1337                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1338        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1339                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1340        self.assertRegex(
1341                repr(EXCEPTION_FUTURE),
1342                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
1343        self.assertRegex(
1344                repr(SUCCESSFUL_FUTURE),
1345                '<Future at 0x[0-9a-f]+ state=finished returned int>')
1346
1347
1348    def test_cancel(self):
1349        f1 = create_future(state=PENDING)
1350        f2 = create_future(state=RUNNING)
1351        f3 = create_future(state=CANCELLED)
1352        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
1353        f5 = create_future(state=FINISHED, exception=OSError())
1354        f6 = create_future(state=FINISHED, result=5)
1355
1356        self.assertTrue(f1.cancel())
1357        self.assertEqual(f1._state, CANCELLED)
1358
1359        self.assertFalse(f2.cancel())
1360        self.assertEqual(f2._state, RUNNING)
1361
1362        self.assertTrue(f3.cancel())
1363        self.assertEqual(f3._state, CANCELLED)
1364
1365        self.assertTrue(f4.cancel())
1366        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1367
1368        self.assertFalse(f5.cancel())
1369        self.assertEqual(f5._state, FINISHED)
1370
1371        self.assertFalse(f6.cancel())
1372        self.assertEqual(f6._state, FINISHED)
1373
1374    def test_cancelled(self):
1375        self.assertFalse(PENDING_FUTURE.cancelled())
1376        self.assertFalse(RUNNING_FUTURE.cancelled())
1377        self.assertTrue(CANCELLED_FUTURE.cancelled())
1378        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1379        self.assertFalse(EXCEPTION_FUTURE.cancelled())
1380        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1381
1382    def test_done(self):
1383        self.assertFalse(PENDING_FUTURE.done())
1384        self.assertFalse(RUNNING_FUTURE.done())
1385        self.assertTrue(CANCELLED_FUTURE.done())
1386        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1387        self.assertTrue(EXCEPTION_FUTURE.done())
1388        self.assertTrue(SUCCESSFUL_FUTURE.done())
1389
1390    def test_running(self):
1391        self.assertFalse(PENDING_FUTURE.running())
1392        self.assertTrue(RUNNING_FUTURE.running())
1393        self.assertFalse(CANCELLED_FUTURE.running())
1394        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1395        self.assertFalse(EXCEPTION_FUTURE.running())
1396        self.assertFalse(SUCCESSFUL_FUTURE.running())
1397
1398    def test_result_with_timeout(self):
1399        self.assertRaises(futures.TimeoutError,
1400                          PENDING_FUTURE.result, timeout=0)
1401        self.assertRaises(futures.TimeoutError,
1402                          RUNNING_FUTURE.result, timeout=0)
1403        self.assertRaises(futures.CancelledError,
1404                          CANCELLED_FUTURE.result, timeout=0)
1405        self.assertRaises(futures.CancelledError,
1406                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
1407        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1408        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1409
1410    def test_result_with_success(self):
1411        # TODO(brian@sweetapp.com): This test is timing dependent.
1412        def notification():
1413            # Wait until the main thread is waiting for the result.
1414            time.sleep(1)
1415            f1.set_result(42)
1416
1417        f1 = create_future(state=PENDING)
1418        t = threading.Thread(target=notification)
1419        t.start()
1420
1421        self.assertEqual(f1.result(timeout=5), 42)
1422        t.join()
1423
1424    def test_result_with_cancel(self):
1425        # TODO(brian@sweetapp.com): This test is timing dependent.
1426        def notification():
1427            # Wait until the main thread is waiting for the result.
1428            time.sleep(1)
1429            f1.cancel()
1430
1431        f1 = create_future(state=PENDING)
1432        t = threading.Thread(target=notification)
1433        t.start()
1434
1435        self.assertRaises(futures.CancelledError,
1436                          f1.result, timeout=support.SHORT_TIMEOUT)
1437        t.join()
1438
1439    def test_exception_with_timeout(self):
1440        self.assertRaises(futures.TimeoutError,
1441                          PENDING_FUTURE.exception, timeout=0)
1442        self.assertRaises(futures.TimeoutError,
1443                          RUNNING_FUTURE.exception, timeout=0)
1444        self.assertRaises(futures.CancelledError,
1445                          CANCELLED_FUTURE.exception, timeout=0)
1446        self.assertRaises(futures.CancelledError,
1447                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1448        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1449                                   OSError))
1450        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1451
1452    def test_exception_with_success(self):
1453        def notification():
1454            # Wait until the main thread is waiting for the exception.
1455            time.sleep(1)
1456            with f1._condition:
1457                f1._state = FINISHED
1458                f1._exception = OSError()
1459                f1._condition.notify_all()
1460
1461        f1 = create_future(state=PENDING)
1462        t = threading.Thread(target=notification)
1463        t.start()
1464
1465        self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
1466        t.join()
1467
1468    def test_multiple_set_result(self):
1469        f = create_future(state=PENDING)
1470        f.set_result(1)
1471
1472        with self.assertRaisesRegex(
1473                futures.InvalidStateError,
1474                'FINISHED: <Future at 0x[0-9a-f]+ '
1475                'state=finished returned int>'
1476        ):
1477            f.set_result(2)
1478
1479        self.assertTrue(f.done())
1480        self.assertEqual(f.result(), 1)
1481
1482    def test_multiple_set_exception(self):
1483        f = create_future(state=PENDING)
1484        e = ValueError()
1485        f.set_exception(e)
1486
1487        with self.assertRaisesRegex(
1488                futures.InvalidStateError,
1489                'FINISHED: <Future at 0x[0-9a-f]+ '
1490                'state=finished raised ValueError>'
1491        ):
1492            f.set_exception(Exception())
1493
1494        self.assertEqual(f.exception(), e)
1495
1496
1497_threads_key = None
1498
1499def setUpModule():
1500    global _threads_key
1501    _threads_key = support.threading_setup()
1502
1503
1504def tearDownModule():
1505    support.threading_cleanup(*_threads_key)
1506    multiprocessing.util._cleanup_tests()
1507
1508
1509if __name__ == "__main__":
1510    unittest.main()
1511