• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from test import support
2from test.support import import_helper
3from test.support import threading_helper
4
5# Skip tests if _multiprocessing wasn't built.
6import_helper.import_module('_multiprocessing')
7
8from test.support import hashlib_helper
9from test.support.script_helper import assert_python_ok
10
11import contextlib
12import itertools
13import logging
14from logging.handlers import QueueHandler
15import os
16import queue
17import sys
18import threading
19import time
20import unittest
21import weakref
22from pickle import PicklingError
23
24from concurrent import futures
25from concurrent.futures._base import (
26    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
27    BrokenExecutor)
28from concurrent.futures.process import BrokenProcessPool, _check_system_limits
29from multiprocessing import get_context
30
31import multiprocessing.process
32import multiprocessing.util
33
34
35def create_future(state=PENDING, exception=None, result=None):
36    f = Future()
37    f._state = state
38    f._exception = exception
39    f._result = result
40    return f
41
42
43PENDING_FUTURE = create_future(state=PENDING)
44RUNNING_FUTURE = create_future(state=RUNNING)
45CANCELLED_FUTURE = create_future(state=CANCELLED)
46CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
47EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
48SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
49
50INITIALIZER_STATUS = 'uninitialized'
51
52
53def mul(x, y):
54    return x * y
55
56def capture(*args, **kwargs):
57    return args, kwargs
58
59def sleep_and_raise(t):
60    time.sleep(t)
61    raise Exception('this is an exception')
62
63def sleep_and_print(t, msg):
64    time.sleep(t)
65    print(msg)
66    sys.stdout.flush()
67
68def init(x):
69    global INITIALIZER_STATUS
70    INITIALIZER_STATUS = x
71
72def get_init_status():
73    return INITIALIZER_STATUS
74
75def init_fail(log_queue=None):
76    if log_queue is not None:
77        logger = logging.getLogger('concurrent.futures')
78        logger.addHandler(QueueHandler(log_queue))
79        logger.setLevel('CRITICAL')
80        logger.propagate = False
81    time.sleep(0.1)  # let some futures be scheduled
82    raise ValueError('error in initializer')
83
84
85class MyObject(object):
86    def my_method(self):
87        pass
88
89
90class EventfulGCObj():
91    def __init__(self, mgr):
92        self.event = mgr.Event()
93
94    def __del__(self):
95        self.event.set()
96
97
98def make_dummy_object(_):
99    return MyObject()
100
101
102class BaseTestCase(unittest.TestCase):
103    def setUp(self):
104        self._thread_key = threading_helper.threading_setup()
105
106    def tearDown(self):
107        support.reap_children()
108        threading_helper.threading_cleanup(*self._thread_key)
109
110
111class ExecutorMixin:
112    worker_count = 5
113    executor_kwargs = {}
114
115    def setUp(self):
116        super().setUp()
117
118        self.t1 = time.monotonic()
119        if hasattr(self, "ctx"):
120            self.executor = self.executor_type(
121                max_workers=self.worker_count,
122                mp_context=self.get_context(),
123                **self.executor_kwargs)
124        else:
125            self.executor = self.executor_type(
126                max_workers=self.worker_count,
127                **self.executor_kwargs)
128        self._prime_executor()
129
130    def tearDown(self):
131        self.executor.shutdown(wait=True)
132        self.executor = None
133
134        dt = time.monotonic() - self.t1
135        if support.verbose:
136            print("%.2fs" % dt, end=' ')
137        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
138
139        super().tearDown()
140
141    def get_context(self):
142        return get_context(self.ctx)
143
144    def _prime_executor(self):
145        # Make sure that the executor is ready to do work before running the
146        # tests. This should reduce the probability of timeouts in the tests.
147        futures = [self.executor.submit(time.sleep, 0.1)
148                   for _ in range(self.worker_count)]
149        for f in futures:
150            f.result()
151
152
153class ThreadPoolMixin(ExecutorMixin):
154    executor_type = futures.ThreadPoolExecutor
155
156
157class ProcessPoolForkMixin(ExecutorMixin):
158    executor_type = futures.ProcessPoolExecutor
159    ctx = "fork"
160
161    def get_context(self):
162        try:
163            _check_system_limits()
164        except NotImplementedError:
165            self.skipTest("ProcessPoolExecutor unavailable on this system")
166        if sys.platform == "win32":
167            self.skipTest("require unix system")
168        return super().get_context()
169
170
171class ProcessPoolSpawnMixin(ExecutorMixin):
172    executor_type = futures.ProcessPoolExecutor
173    ctx = "spawn"
174
175    def get_context(self):
176        try:
177            _check_system_limits()
178        except NotImplementedError:
179            self.skipTest("ProcessPoolExecutor unavailable on this system")
180        return super().get_context()
181
182
183class ProcessPoolForkserverMixin(ExecutorMixin):
184    executor_type = futures.ProcessPoolExecutor
185    ctx = "forkserver"
186
187    def get_context(self):
188        try:
189            _check_system_limits()
190        except NotImplementedError:
191            self.skipTest("ProcessPoolExecutor unavailable on this system")
192        if sys.platform == "win32":
193            self.skipTest("require unix system")
194        return super().get_context()
195
196
197def create_executor_tests(mixin, bases=(BaseTestCase,),
198                          executor_mixins=(ThreadPoolMixin,
199                                           ProcessPoolForkMixin,
200                                           ProcessPoolForkserverMixin,
201                                           ProcessPoolSpawnMixin)):
202    def strip_mixin(name):
203        if name.endswith(('Mixin', 'Tests')):
204            return name[:-5]
205        elif name.endswith('Test'):
206            return name[:-4]
207        else:
208            return name
209
210    for exe in executor_mixins:
211        name = ("%s%sTest"
212                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
213        cls = type(name, (mixin,) + (exe,) + bases, {})
214        globals()[name] = cls
215
216
217class InitializerMixin(ExecutorMixin):
218    worker_count = 2
219
220    def setUp(self):
221        global INITIALIZER_STATUS
222        INITIALIZER_STATUS = 'uninitialized'
223        self.executor_kwargs = dict(initializer=init,
224                                    initargs=('initialized',))
225        super().setUp()
226
227    def test_initializer(self):
228        futures = [self.executor.submit(get_init_status)
229                   for _ in range(self.worker_count)]
230
231        for f in futures:
232            self.assertEqual(f.result(), 'initialized')
233
234
235class FailingInitializerMixin(ExecutorMixin):
236    worker_count = 2
237
238    def setUp(self):
239        if hasattr(self, "ctx"):
240            # Pass a queue to redirect the child's logging output
241            self.mp_context = self.get_context()
242            self.log_queue = self.mp_context.Queue()
243            self.executor_kwargs = dict(initializer=init_fail,
244                                        initargs=(self.log_queue,))
245        else:
246            # In a thread pool, the child shares our logging setup
247            # (see _assert_logged())
248            self.mp_context = None
249            self.log_queue = None
250            self.executor_kwargs = dict(initializer=init_fail)
251        super().setUp()
252
253    def test_initializer(self):
254        with self._assert_logged('ValueError: error in initializer'):
255            try:
256                future = self.executor.submit(get_init_status)
257            except BrokenExecutor:
258                # Perhaps the executor is already broken
259                pass
260            else:
261                with self.assertRaises(BrokenExecutor):
262                    future.result()
263            # At some point, the executor should break
264            t1 = time.monotonic()
265            while not self.executor._broken:
266                if time.monotonic() - t1 > 5:
267                    self.fail("executor not broken after 5 s.")
268                time.sleep(0.01)
269            # ... and from this point submit() is guaranteed to fail
270            with self.assertRaises(BrokenExecutor):
271                self.executor.submit(get_init_status)
272
273    def _prime_executor(self):
274        pass
275
276    @contextlib.contextmanager
277    def _assert_logged(self, msg):
278        if self.log_queue is not None:
279            yield
280            output = []
281            try:
282                while True:
283                    output.append(self.log_queue.get_nowait().getMessage())
284            except queue.Empty:
285                pass
286        else:
287            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
288                yield
289            output = cm.output
290        self.assertTrue(any(msg in line for line in output),
291                        output)
292
293
294create_executor_tests(InitializerMixin)
295create_executor_tests(FailingInitializerMixin)
296
297
298class ExecutorShutdownTest:
299    def test_run_after_shutdown(self):
300        self.executor.shutdown()
301        self.assertRaises(RuntimeError,
302                          self.executor.submit,
303                          pow, 2, 5)
304
305    def test_interpreter_shutdown(self):
306        # Test the atexit hook for shutdown of worker threads and processes
307        rc, out, err = assert_python_ok('-c', """if 1:
308            from concurrent.futures import {executor_type}
309            from time import sleep
310            from test.test_concurrent_futures import sleep_and_print
311            if __name__ == "__main__":
312                context = '{context}'
313                if context == "":
314                    t = {executor_type}(5)
315                else:
316                    from multiprocessing import get_context
317                    context = get_context(context)
318                    t = {executor_type}(5, mp_context=context)
319                t.submit(sleep_and_print, 1.0, "apple")
320            """.format(executor_type=self.executor_type.__name__,
321                       context=getattr(self, "ctx", "")))
322        # Errors in atexit hooks don't change the process exit code, check
323        # stderr manually.
324        self.assertFalse(err)
325        self.assertEqual(out.strip(), b"apple")
326
327    def test_submit_after_interpreter_shutdown(self):
328        # Test the atexit hook for shutdown of worker threads and processes
329        rc, out, err = assert_python_ok('-c', """if 1:
330            import atexit
331            @atexit.register
332            def run_last():
333                try:
334                    t.submit(id, None)
335                except RuntimeError:
336                    print("runtime-error")
337                    raise
338            from concurrent.futures import {executor_type}
339            if __name__ == "__main__":
340                context = '{context}'
341                if not context:
342                    t = {executor_type}(5)
343                else:
344                    from multiprocessing import get_context
345                    context = get_context(context)
346                    t = {executor_type}(5, mp_context=context)
347                    t.submit(id, 42).result()
348            """.format(executor_type=self.executor_type.__name__,
349                       context=getattr(self, "ctx", "")))
350        # Errors in atexit hooks don't change the process exit code, check
351        # stderr manually.
352        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
353        self.assertEqual(out.strip(), b"runtime-error")
354
355    def test_hang_issue12364(self):
356        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
357        self.executor.shutdown()
358        for f in fs:
359            f.result()
360
361    def test_cancel_futures(self):
362        executor = self.executor_type(max_workers=3)
363        fs = [executor.submit(time.sleep, .1) for _ in range(50)]
364        executor.shutdown(cancel_futures=True)
365        # We can't guarantee the exact number of cancellations, but we can
366        # guarantee that *some* were cancelled. With setting max_workers to 3,
367        # most of the submitted futures should have been cancelled.
368        cancelled = [fut for fut in fs if fut.cancelled()]
369        self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
370
371        # Ensure the other futures were able to finish.
372        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
373        # that may have been left in a pending state.
374        others = [fut for fut in fs if not fut.cancelled()]
375        for fut in others:
376            self.assertTrue(fut.done(), msg=f"{fut._state=}")
377            self.assertIsNone(fut.exception())
378
379        # Similar to the number of cancelled futures, we can't guarantee the
380        # exact number that completed. But, we can guarantee that at least
381        # one finished.
382        self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
383
384    def test_hang_issue39205(self):
385        """shutdown(wait=False) doesn't hang at exit with running futures.
386
387        See https://bugs.python.org/issue39205.
388        """
389        if self.executor_type == futures.ProcessPoolExecutor:
390            raise unittest.SkipTest(
391                "Hangs due to https://bugs.python.org/issue39205")
392
393        rc, out, err = assert_python_ok('-c', """if True:
394            from concurrent.futures import {executor_type}
395            from test.test_concurrent_futures import sleep_and_print
396            if __name__ == "__main__":
397                t = {executor_type}(max_workers=3)
398                t.submit(sleep_and_print, 1.0, "apple")
399                t.shutdown(wait=False)
400            """.format(executor_type=self.executor_type.__name__))
401        self.assertFalse(err)
402        self.assertEqual(out.strip(), b"apple")
403
404
405class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
406    def _prime_executor(self):
407        pass
408
409    def test_threads_terminate(self):
410        def acquire_lock(lock):
411            lock.acquire()
412
413        sem = threading.Semaphore(0)
414        for i in range(3):
415            self.executor.submit(acquire_lock, sem)
416        self.assertEqual(len(self.executor._threads), 3)
417        for i in range(3):
418            sem.release()
419        self.executor.shutdown()
420        for t in self.executor._threads:
421            t.join()
422
423    def test_context_manager_shutdown(self):
424        with futures.ThreadPoolExecutor(max_workers=5) as e:
425            executor = e
426            self.assertEqual(list(e.map(abs, range(-5, 5))),
427                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
428
429        for t in executor._threads:
430            t.join()
431
432    def test_del_shutdown(self):
433        executor = futures.ThreadPoolExecutor(max_workers=5)
434        res = executor.map(abs, range(-5, 5))
435        threads = executor._threads
436        del executor
437
438        for t in threads:
439            t.join()
440
441        # Make sure the results were all computed before the
442        # executor got shutdown.
443        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
444
445    def test_shutdown_no_wait(self):
446        # Ensure that the executor cleans up the threads when calling
447        # shutdown with wait=False
448        executor = futures.ThreadPoolExecutor(max_workers=5)
449        res = executor.map(abs, range(-5, 5))
450        threads = executor._threads
451        executor.shutdown(wait=False)
452        for t in threads:
453            t.join()
454
455        # Make sure the results were all computed before the
456        # executor got shutdown.
457        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
458
459
460    def test_thread_names_assigned(self):
461        executor = futures.ThreadPoolExecutor(
462            max_workers=5, thread_name_prefix='SpecialPool')
463        executor.map(abs, range(-5, 5))
464        threads = executor._threads
465        del executor
466        support.gc_collect()  # For PyPy or other GCs.
467
468        for t in threads:
469            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
470            t.join()
471
472    def test_thread_names_default(self):
473        executor = futures.ThreadPoolExecutor(max_workers=5)
474        executor.map(abs, range(-5, 5))
475        threads = executor._threads
476        del executor
477        support.gc_collect()  # For PyPy or other GCs.
478
479        for t in threads:
480            # Ensure that our default name is reasonably sane and unique when
481            # no thread_name_prefix was supplied.
482            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
483            t.join()
484
485    def test_cancel_futures_wait_false(self):
486        # Can only be reliably tested for TPE, since PPE often hangs with
487        # `wait=False` (even without *cancel_futures*).
488        rc, out, err = assert_python_ok('-c', """if True:
489            from concurrent.futures import ThreadPoolExecutor
490            from test.test_concurrent_futures import sleep_and_print
491            if __name__ == "__main__":
492                t = ThreadPoolExecutor()
493                t.submit(sleep_and_print, .1, "apple")
494                t.shutdown(wait=False, cancel_futures=True)
495            """.format(executor_type=self.executor_type.__name__))
496        # Errors in atexit hooks don't change the process exit code, check
497        # stderr manually.
498        self.assertFalse(err)
499        self.assertEqual(out.strip(), b"apple")
500
501
502class ProcessPoolShutdownTest(ExecutorShutdownTest):
503    def _prime_executor(self):
504        pass
505
506    def test_processes_terminate(self):
507        def acquire_lock(lock):
508            lock.acquire()
509
510        mp_context = get_context()
511        sem = mp_context.Semaphore(0)
512        for _ in range(3):
513            self.executor.submit(acquire_lock, sem)
514        self.assertEqual(len(self.executor._processes), 3)
515        for _ in range(3):
516            sem.release()
517        processes = self.executor._processes
518        self.executor.shutdown()
519
520        for p in processes.values():
521            p.join()
522
523    def test_context_manager_shutdown(self):
524        with futures.ProcessPoolExecutor(max_workers=5) as e:
525            processes = e._processes
526            self.assertEqual(list(e.map(abs, range(-5, 5))),
527                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
528
529        for p in processes.values():
530            p.join()
531
532    def test_del_shutdown(self):
533        executor = futures.ProcessPoolExecutor(max_workers=5)
534        res = executor.map(abs, range(-5, 5))
535        executor_manager_thread = executor._executor_manager_thread
536        processes = executor._processes
537        call_queue = executor._call_queue
538        executor_manager_thread = executor._executor_manager_thread
539        del executor
540        support.gc_collect()  # For PyPy or other GCs.
541
542        # Make sure that all the executor resources were properly cleaned by
543        # the shutdown process
544        executor_manager_thread.join()
545        for p in processes.values():
546            p.join()
547        call_queue.join_thread()
548
549        # Make sure the results were all computed before the
550        # executor got shutdown.
551        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
552
553    def test_shutdown_no_wait(self):
554        # Ensure that the executor cleans up the processes when calling
555        # shutdown with wait=False
556        executor = futures.ProcessPoolExecutor(max_workers=5)
557        res = executor.map(abs, range(-5, 5))
558        processes = executor._processes
559        call_queue = executor._call_queue
560        executor_manager_thread = executor._executor_manager_thread
561        executor.shutdown(wait=False)
562
563        # Make sure that all the executor resources were properly cleaned by
564        # the shutdown process
565        executor_manager_thread.join()
566        for p in processes.values():
567            p.join()
568        call_queue.join_thread()
569
570        # Make sure the results were all computed before the executor got
571        # shutdown.
572        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
573
574
575create_executor_tests(ProcessPoolShutdownTest,
576                      executor_mixins=(ProcessPoolForkMixin,
577                                       ProcessPoolForkserverMixin,
578                                       ProcessPoolSpawnMixin))
579
580
581class WaitTests:
582    def test_20369(self):
583        # See https://bugs.python.org/issue20369
584        future = self.executor.submit(time.sleep, 1.5)
585        done, not_done = futures.wait([future, future],
586                            return_when=futures.ALL_COMPLETED)
587        self.assertEqual({future}, done)
588        self.assertEqual(set(), not_done)
589
590
591    def test_first_completed(self):
592        future1 = self.executor.submit(mul, 21, 2)
593        future2 = self.executor.submit(time.sleep, 1.5)
594
595        done, not_done = futures.wait(
596                [CANCELLED_FUTURE, future1, future2],
597                 return_when=futures.FIRST_COMPLETED)
598
599        self.assertEqual(set([future1]), done)
600        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
601
602    def test_first_completed_some_already_completed(self):
603        future1 = self.executor.submit(time.sleep, 1.5)
604
605        finished, pending = futures.wait(
606                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
607                 return_when=futures.FIRST_COMPLETED)
608
609        self.assertEqual(
610                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
611                finished)
612        self.assertEqual(set([future1]), pending)
613
614    def test_first_exception(self):
615        future1 = self.executor.submit(mul, 2, 21)
616        future2 = self.executor.submit(sleep_and_raise, 1.5)
617        future3 = self.executor.submit(time.sleep, 3)
618
619        finished, pending = futures.wait(
620                [future1, future2, future3],
621                return_when=futures.FIRST_EXCEPTION)
622
623        self.assertEqual(set([future1, future2]), finished)
624        self.assertEqual(set([future3]), pending)
625
626    def test_first_exception_some_already_complete(self):
627        future1 = self.executor.submit(divmod, 21, 0)
628        future2 = self.executor.submit(time.sleep, 1.5)
629
630        finished, pending = futures.wait(
631                [SUCCESSFUL_FUTURE,
632                 CANCELLED_FUTURE,
633                 CANCELLED_AND_NOTIFIED_FUTURE,
634                 future1, future2],
635                return_when=futures.FIRST_EXCEPTION)
636
637        self.assertEqual(set([SUCCESSFUL_FUTURE,
638                              CANCELLED_AND_NOTIFIED_FUTURE,
639                              future1]), finished)
640        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
641
642    def test_first_exception_one_already_failed(self):
643        future1 = self.executor.submit(time.sleep, 2)
644
645        finished, pending = futures.wait(
646                 [EXCEPTION_FUTURE, future1],
647                 return_when=futures.FIRST_EXCEPTION)
648
649        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
650        self.assertEqual(set([future1]), pending)
651
652    def test_all_completed(self):
653        future1 = self.executor.submit(divmod, 2, 0)
654        future2 = self.executor.submit(mul, 2, 21)
655
656        finished, pending = futures.wait(
657                [SUCCESSFUL_FUTURE,
658                 CANCELLED_AND_NOTIFIED_FUTURE,
659                 EXCEPTION_FUTURE,
660                 future1,
661                 future2],
662                return_when=futures.ALL_COMPLETED)
663
664        self.assertEqual(set([SUCCESSFUL_FUTURE,
665                              CANCELLED_AND_NOTIFIED_FUTURE,
666                              EXCEPTION_FUTURE,
667                              future1,
668                              future2]), finished)
669        self.assertEqual(set(), pending)
670
671    def test_timeout(self):
672        future1 = self.executor.submit(mul, 6, 7)
673        future2 = self.executor.submit(time.sleep, 6)
674
675        finished, pending = futures.wait(
676                [CANCELLED_AND_NOTIFIED_FUTURE,
677                 EXCEPTION_FUTURE,
678                 SUCCESSFUL_FUTURE,
679                 future1, future2],
680                timeout=5,
681                return_when=futures.ALL_COMPLETED)
682
683        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
684                              EXCEPTION_FUTURE,
685                              SUCCESSFUL_FUTURE,
686                              future1]), finished)
687        self.assertEqual(set([future2]), pending)
688
689
690class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
691
692    def test_pending_calls_race(self):
693        # Issue #14406: multi-threaded race condition when waiting on all
694        # futures.
695        event = threading.Event()
696        def future_func():
697            event.wait()
698        oldswitchinterval = sys.getswitchinterval()
699        sys.setswitchinterval(1e-6)
700        try:
701            fs = {self.executor.submit(future_func) for i in range(100)}
702            event.set()
703            futures.wait(fs, return_when=futures.ALL_COMPLETED)
704        finally:
705            sys.setswitchinterval(oldswitchinterval)
706
707
708create_executor_tests(WaitTests,
709                      executor_mixins=(ProcessPoolForkMixin,
710                                       ProcessPoolForkserverMixin,
711                                       ProcessPoolSpawnMixin))
712
713
714class AsCompletedTests:
715    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
716    def test_no_timeout(self):
717        future1 = self.executor.submit(mul, 2, 21)
718        future2 = self.executor.submit(mul, 7, 6)
719
720        completed = set(futures.as_completed(
721                [CANCELLED_AND_NOTIFIED_FUTURE,
722                 EXCEPTION_FUTURE,
723                 SUCCESSFUL_FUTURE,
724                 future1, future2]))
725        self.assertEqual(set(
726                [CANCELLED_AND_NOTIFIED_FUTURE,
727                 EXCEPTION_FUTURE,
728                 SUCCESSFUL_FUTURE,
729                 future1, future2]),
730                completed)
731
732    def test_zero_timeout(self):
733        future1 = self.executor.submit(time.sleep, 2)
734        completed_futures = set()
735        try:
736            for future in futures.as_completed(
737                    [CANCELLED_AND_NOTIFIED_FUTURE,
738                     EXCEPTION_FUTURE,
739                     SUCCESSFUL_FUTURE,
740                     future1],
741                    timeout=0):
742                completed_futures.add(future)
743        except futures.TimeoutError:
744            pass
745
746        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
747                              EXCEPTION_FUTURE,
748                              SUCCESSFUL_FUTURE]),
749                         completed_futures)
750
751    def test_duplicate_futures(self):
752        # Issue 20367. Duplicate futures should not raise exceptions or give
753        # duplicate responses.
754        # Issue #31641: accept arbitrary iterables.
755        future1 = self.executor.submit(time.sleep, 2)
756        completed = [
757            f for f in futures.as_completed(itertools.repeat(future1, 3))
758        ]
759        self.assertEqual(len(completed), 1)
760
761    def test_free_reference_yielded_future(self):
762        # Issue #14406: Generator should not keep references
763        # to finished futures.
764        futures_list = [Future() for _ in range(8)]
765        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
766        futures_list.append(create_future(state=FINISHED, result=42))
767
768        with self.assertRaises(futures.TimeoutError):
769            for future in futures.as_completed(futures_list, timeout=0):
770                futures_list.remove(future)
771                wr = weakref.ref(future)
772                del future
773                support.gc_collect()  # For PyPy or other GCs.
774                self.assertIsNone(wr())
775
776        futures_list[0].set_result("test")
777        for future in futures.as_completed(futures_list):
778            futures_list.remove(future)
779            wr = weakref.ref(future)
780            del future
781            support.gc_collect()  # For PyPy or other GCs.
782            self.assertIsNone(wr())
783            if futures_list:
784                futures_list[0].set_result("test")
785
786    def test_correct_timeout_exception_msg(self):
787        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
788                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
789
790        with self.assertRaises(futures.TimeoutError) as cm:
791            list(futures.as_completed(futures_list, timeout=0))
792
793        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
794
795
796create_executor_tests(AsCompletedTests)
797
798
799class ExecutorTest:
800    # Executor.shutdown() and context manager usage is tested by
801    # ExecutorShutdownTest.
802    def test_submit(self):
803        future = self.executor.submit(pow, 2, 8)
804        self.assertEqual(256, future.result())
805
806    def test_submit_keyword(self):
807        future = self.executor.submit(mul, 2, y=8)
808        self.assertEqual(16, future.result())
809        future = self.executor.submit(capture, 1, self=2, fn=3)
810        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
811        with self.assertRaises(TypeError):
812            self.executor.submit(fn=capture, arg=1)
813        with self.assertRaises(TypeError):
814            self.executor.submit(arg=1)
815
816    def test_map(self):
817        self.assertEqual(
818                list(self.executor.map(pow, range(10), range(10))),
819                list(map(pow, range(10), range(10))))
820
821        self.assertEqual(
822                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
823                list(map(pow, range(10), range(10))))
824
825    def test_map_exception(self):
826        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
827        self.assertEqual(i.__next__(), (0, 1))
828        self.assertEqual(i.__next__(), (0, 1))
829        self.assertRaises(ZeroDivisionError, i.__next__)
830
831    def test_map_timeout(self):
832        results = []
833        try:
834            for i in self.executor.map(time.sleep,
835                                       [0, 0, 6],
836                                       timeout=5):
837                results.append(i)
838        except futures.TimeoutError:
839            pass
840        else:
841            self.fail('expected TimeoutError')
842
843        self.assertEqual([None, None], results)
844
845    def test_shutdown_race_issue12456(self):
846        # Issue #12456: race condition at shutdown where trying to post a
847        # sentinel in the call queue blocks (the queue is full while processes
848        # have exited).
849        self.executor.map(str, [2] * (self.worker_count + 1))
850        self.executor.shutdown()
851
852    @support.cpython_only
853    def test_no_stale_references(self):
854        # Issue #16284: check that the executors don't unnecessarily hang onto
855        # references.
856        my_object = MyObject()
857        my_object_collected = threading.Event()
858        my_object_callback = weakref.ref(
859            my_object, lambda obj: my_object_collected.set())
860        # Deliberately discarding the future.
861        self.executor.submit(my_object.my_method)
862        del my_object
863
864        collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
865        self.assertTrue(collected,
866                        "Stale reference not collected within timeout.")
867
868    def test_max_workers_negative(self):
869        for number in (0, -1):
870            with self.assertRaisesRegex(ValueError,
871                                        "max_workers must be greater "
872                                        "than 0"):
873                self.executor_type(max_workers=number)
874
875    def test_free_reference(self):
876        # Issue #14406: Result iterator should not keep an internal
877        # reference to result objects.
878        for obj in self.executor.map(make_dummy_object, range(10)):
879            wr = weakref.ref(obj)
880            del obj
881            support.gc_collect()  # For PyPy or other GCs.
882            self.assertIsNone(wr())
883
884
885class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
886    def test_map_submits_without_iteration(self):
887        """Tests verifying issue 11777."""
888        finished = []
889        def record_finished(n):
890            finished.append(n)
891
892        self.executor.map(record_finished, range(10))
893        self.executor.shutdown(wait=True)
894        self.assertCountEqual(finished, range(10))
895
896    def test_default_workers(self):
897        executor = self.executor_type()
898        expected = min(32, (os.cpu_count() or 1) + 4)
899        self.assertEqual(executor._max_workers, expected)
900
901    def test_saturation(self):
902        executor = self.executor_type(4)
903        def acquire_lock(lock):
904            lock.acquire()
905
906        sem = threading.Semaphore(0)
907        for i in range(15 * executor._max_workers):
908            executor.submit(acquire_lock, sem)
909        self.assertEqual(len(executor._threads), executor._max_workers)
910        for i in range(15 * executor._max_workers):
911            sem.release()
912        executor.shutdown(wait=True)
913
914    def test_idle_thread_reuse(self):
915        executor = self.executor_type()
916        executor.submit(mul, 21, 2).result()
917        executor.submit(mul, 6, 7).result()
918        executor.submit(mul, 3, 14).result()
919        self.assertEqual(len(executor._threads), 1)
920        executor.shutdown(wait=True)
921
922    @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
923    def test_hang_global_shutdown_lock(self):
924        # bpo-45021: _global_shutdown_lock should be reinitialized in the child
925        # process, otherwise it will never exit
926        def submit(pool):
927            pool.submit(submit, pool)
928
929        with futures.ThreadPoolExecutor(1) as pool:
930            pool.submit(submit, pool)
931
932            for _ in range(50):
933                with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
934                    workers.submit(tuple)
935
936
937class ProcessPoolExecutorTest(ExecutorTest):
938
939    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
940    def test_max_workers_too_large(self):
941        with self.assertRaisesRegex(ValueError,
942                                    "max_workers must be <= 61"):
943            futures.ProcessPoolExecutor(max_workers=62)
944
945    def test_killed_child(self):
946        # When a child process is abruptly terminated, the whole pool gets
947        # "broken".
948        futures = [self.executor.submit(time.sleep, 3)]
949        # Get one of the processes, and terminate (kill) it
950        p = next(iter(self.executor._processes.values()))
951        p.terminate()
952        for fut in futures:
953            self.assertRaises(BrokenProcessPool, fut.result)
954        # Submitting other jobs fails as well.
955        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
956
957    def test_map_chunksize(self):
958        def bad_map():
959            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
960
961        ref = list(map(pow, range(40), range(40)))
962        self.assertEqual(
963            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
964            ref)
965        self.assertEqual(
966            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
967            ref)
968        self.assertEqual(
969            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
970            ref)
971        self.assertRaises(ValueError, bad_map)
972
973    @classmethod
974    def _test_traceback(cls):
975        raise RuntimeError(123) # some comment
976
977    def test_traceback(self):
978        # We want ensure that the traceback from the child process is
979        # contained in the traceback raised in the main process.
980        future = self.executor.submit(self._test_traceback)
981        with self.assertRaises(Exception) as cm:
982            future.result()
983
984        exc = cm.exception
985        self.assertIs(type(exc), RuntimeError)
986        self.assertEqual(exc.args, (123,))
987        cause = exc.__cause__
988        self.assertIs(type(cause), futures.process._RemoteTraceback)
989        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
990
991        with support.captured_stderr() as f1:
992            try:
993                raise exc
994            except RuntimeError:
995                sys.excepthook(*sys.exc_info())
996        self.assertIn('raise RuntimeError(123) # some comment',
997                      f1.getvalue())
998
999    @hashlib_helper.requires_hashdigest('md5')
1000    def test_ressources_gced_in_workers(self):
1001        # Ensure that argument for a job are correctly gc-ed after the job
1002        # is finished
1003        mgr = get_context(self.ctx).Manager()
1004        obj = EventfulGCObj(mgr)
1005        future = self.executor.submit(id, obj)
1006        future.result()
1007
1008        self.assertTrue(obj.event.wait(timeout=1))
1009
1010        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
1011        # is called while manager is still running.
1012        obj = None
1013        support.gc_collect()
1014
1015        mgr.shutdown()
1016        mgr.join()
1017
1018    def test_saturation(self):
1019        executor = self.executor_type(4)
1020        mp_context = get_context()
1021        sem = mp_context.Semaphore(0)
1022        job_count = 15 * executor._max_workers
1023        try:
1024            for _ in range(job_count):
1025                executor.submit(sem.acquire)
1026            self.assertEqual(len(executor._processes), executor._max_workers)
1027            for _ in range(job_count):
1028                sem.release()
1029        finally:
1030            executor.shutdown()
1031
1032    def test_idle_process_reuse_one(self):
1033        executor = self.executor_type(4)
1034        executor.submit(mul, 21, 2).result()
1035        executor.submit(mul, 6, 7).result()
1036        executor.submit(mul, 3, 14).result()
1037        self.assertEqual(len(executor._processes), 1)
1038        executor.shutdown()
1039
1040    def test_idle_process_reuse_multiple(self):
1041        executor = self.executor_type(4)
1042        executor.submit(mul, 12, 7).result()
1043        executor.submit(mul, 33, 25)
1044        executor.submit(mul, 25, 26).result()
1045        executor.submit(mul, 18, 29)
1046        self.assertLessEqual(len(executor._processes), 2)
1047        executor.shutdown()
1048
1049create_executor_tests(ProcessPoolExecutorTest,
1050                      executor_mixins=(ProcessPoolForkMixin,
1051                                       ProcessPoolForkserverMixin,
1052                                       ProcessPoolSpawnMixin))
1053
1054def _crash(delay=None):
1055    """Induces a segfault."""
1056    if delay:
1057        time.sleep(delay)
1058    import faulthandler
1059    faulthandler.disable()
1060    faulthandler._sigsegv()
1061
1062
1063def _exit():
1064    """Induces a sys exit with exitcode 1."""
1065    sys.exit(1)
1066
1067
1068def _raise_error(Err):
1069    """Function that raises an Exception in process."""
1070    raise Err()
1071
1072
1073def _raise_error_ignore_stderr(Err):
1074    """Function that raises an Exception in process and ignores stderr."""
1075    import io
1076    sys.stderr = io.StringIO()
1077    raise Err()
1078
1079
1080def _return_instance(cls):
1081    """Function that returns a instance of cls."""
1082    return cls()
1083
1084
1085class CrashAtPickle(object):
1086    """Bad object that triggers a segfault at pickling time."""
1087    def __reduce__(self):
1088        _crash()
1089
1090
1091class CrashAtUnpickle(object):
1092    """Bad object that triggers a segfault at unpickling time."""
1093    def __reduce__(self):
1094        return _crash, ()
1095
1096
1097class ExitAtPickle(object):
1098    """Bad object that triggers a process exit at pickling time."""
1099    def __reduce__(self):
1100        _exit()
1101
1102
1103class ExitAtUnpickle(object):
1104    """Bad object that triggers a process exit at unpickling time."""
1105    def __reduce__(self):
1106        return _exit, ()
1107
1108
1109class ErrorAtPickle(object):
1110    """Bad object that triggers an error at pickling time."""
1111    def __reduce__(self):
1112        from pickle import PicklingError
1113        raise PicklingError("Error in pickle")
1114
1115
1116class ErrorAtUnpickle(object):
1117    """Bad object that triggers an error at unpickling time."""
1118    def __reduce__(self):
1119        from pickle import UnpicklingError
1120        return _raise_error_ignore_stderr, (UnpicklingError, )
1121
1122
1123class ExecutorDeadlockTest:
1124    TIMEOUT = support.SHORT_TIMEOUT
1125
1126    def _fail_on_deadlock(self, executor):
1127        # If we did not recover before TIMEOUT seconds, consider that the
1128        # executor is in a deadlock state and forcefully clean all its
1129        # composants.
1130        import faulthandler
1131        from tempfile import TemporaryFile
1132        with TemporaryFile(mode="w+") as f:
1133            faulthandler.dump_traceback(file=f)
1134            f.seek(0)
1135            tb = f.read()
1136        for p in executor._processes.values():
1137            p.terminate()
1138        # This should be safe to call executor.shutdown here as all possible
1139        # deadlocks should have been broken.
1140        executor.shutdown(wait=True)
1141        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
1142        self.fail(f"Executor deadlock:\n\n{tb}")
1143
1144
1145    def _check_crash(self, error, func, *args, ignore_stderr=False):
1146        # test for deadlock caused by crashes in a pool
1147        self.executor.shutdown(wait=True)
1148
1149        executor = self.executor_type(
1150            max_workers=2, mp_context=get_context(self.ctx))
1151        res = executor.submit(func, *args)
1152
1153        if ignore_stderr:
1154            cm = support.captured_stderr()
1155        else:
1156            cm = contextlib.nullcontext()
1157
1158        try:
1159            with self.assertRaises(error):
1160                with cm:
1161                    res.result(timeout=self.TIMEOUT)
1162        except futures.TimeoutError:
1163            # If we did not recover before TIMEOUT seconds,
1164            # consider that the executor is in a deadlock state
1165            self._fail_on_deadlock(executor)
1166        executor.shutdown(wait=True)
1167
1168    def test_error_at_task_pickle(self):
1169        # Check problem occurring while pickling a task in
1170        # the task_handler thread
1171        self._check_crash(PicklingError, id, ErrorAtPickle())
1172
1173    def test_exit_at_task_unpickle(self):
1174        # Check problem occurring while unpickling a task on workers
1175        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
1176
1177    def test_error_at_task_unpickle(self):
1178        # Check problem occurring while unpickling a task on workers
1179        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
1180
1181    def test_crash_at_task_unpickle(self):
1182        # Check problem occurring while unpickling a task on workers
1183        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
1184
1185    def test_crash_during_func_exec_on_worker(self):
1186        # Check problem occurring during func execution on workers
1187        self._check_crash(BrokenProcessPool, _crash)
1188
1189    def test_exit_during_func_exec_on_worker(self):
1190        # Check problem occurring during func execution on workers
1191        self._check_crash(SystemExit, _exit)
1192
1193    def test_error_during_func_exec_on_worker(self):
1194        # Check problem occurring during func execution on workers
1195        self._check_crash(RuntimeError, _raise_error, RuntimeError)
1196
1197    def test_crash_during_result_pickle_on_worker(self):
1198        # Check problem occurring while pickling a task result
1199        # on workers
1200        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
1201
1202    def test_exit_during_result_pickle_on_worker(self):
1203        # Check problem occurring while pickling a task result
1204        # on workers
1205        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
1206
1207    def test_error_during_result_pickle_on_worker(self):
1208        # Check problem occurring while pickling a task result
1209        # on workers
1210        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
1211
1212    def test_error_during_result_unpickle_in_result_handler(self):
1213        # Check problem occurring while unpickling a task in
1214        # the result_handler thread
1215        self._check_crash(BrokenProcessPool,
1216                          _return_instance, ErrorAtUnpickle,
1217                          ignore_stderr=True)
1218
1219    def test_exit_during_result_unpickle_in_result_handler(self):
1220        # Check problem occurring while unpickling a task in
1221        # the result_handler thread
1222        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
1223
1224    def test_shutdown_deadlock(self):
1225        # Test that the pool calling shutdown do not cause deadlock
1226        # if a worker fails after the shutdown call.
1227        self.executor.shutdown(wait=True)
1228        with self.executor_type(max_workers=2,
1229                                mp_context=get_context(self.ctx)) as executor:
1230            self.executor = executor  # Allow clean up in fail_on_deadlock
1231            f = executor.submit(_crash, delay=.1)
1232            executor.shutdown(wait=True)
1233            with self.assertRaises(BrokenProcessPool):
1234                f.result()
1235
1236    def test_shutdown_deadlock_pickle(self):
1237        # Test that the pool calling shutdown with wait=False does not cause
1238        # a deadlock if a task fails at pickle after the shutdown call.
1239        # Reported in bpo-39104.
1240        self.executor.shutdown(wait=True)
1241        with self.executor_type(max_workers=2,
1242                                mp_context=get_context(self.ctx)) as executor:
1243            self.executor = executor  # Allow clean up in fail_on_deadlock
1244
1245            # Start the executor and get the executor_manager_thread to collect
1246            # the threads and avoid dangling thread that should be cleaned up
1247            # asynchronously.
1248            executor.submit(id, 42).result()
1249            executor_manager = executor._executor_manager_thread
1250
1251            # Submit a task that fails at pickle and shutdown the executor
1252            # without waiting
1253            f = executor.submit(id, ErrorAtPickle())
1254            executor.shutdown(wait=False)
1255            with self.assertRaises(PicklingError):
1256                f.result()
1257
1258        # Make sure the executor is eventually shutdown and do not leave
1259        # dangling threads
1260        executor_manager.join()
1261
1262
1263create_executor_tests(ExecutorDeadlockTest,
1264                      executor_mixins=(ProcessPoolForkMixin,
1265                                       ProcessPoolForkserverMixin,
1266                                       ProcessPoolSpawnMixin))
1267
1268
1269class FutureTests(BaseTestCase):
1270    def test_done_callback_with_result(self):
1271        callback_result = None
1272        def fn(callback_future):
1273            nonlocal callback_result
1274            callback_result = callback_future.result()
1275
1276        f = Future()
1277        f.add_done_callback(fn)
1278        f.set_result(5)
1279        self.assertEqual(5, callback_result)
1280
1281    def test_done_callback_with_exception(self):
1282        callback_exception = None
1283        def fn(callback_future):
1284            nonlocal callback_exception
1285            callback_exception = callback_future.exception()
1286
1287        f = Future()
1288        f.add_done_callback(fn)
1289        f.set_exception(Exception('test'))
1290        self.assertEqual(('test',), callback_exception.args)
1291
1292    def test_done_callback_with_cancel(self):
1293        was_cancelled = None
1294        def fn(callback_future):
1295            nonlocal was_cancelled
1296            was_cancelled = callback_future.cancelled()
1297
1298        f = Future()
1299        f.add_done_callback(fn)
1300        self.assertTrue(f.cancel())
1301        self.assertTrue(was_cancelled)
1302
1303    def test_done_callback_raises(self):
1304        with support.captured_stderr() as stderr:
1305            raising_was_called = False
1306            fn_was_called = False
1307
1308            def raising_fn(callback_future):
1309                nonlocal raising_was_called
1310                raising_was_called = True
1311                raise Exception('doh!')
1312
1313            def fn(callback_future):
1314                nonlocal fn_was_called
1315                fn_was_called = True
1316
1317            f = Future()
1318            f.add_done_callback(raising_fn)
1319            f.add_done_callback(fn)
1320            f.set_result(5)
1321            self.assertTrue(raising_was_called)
1322            self.assertTrue(fn_was_called)
1323            self.assertIn('Exception: doh!', stderr.getvalue())
1324
1325    def test_done_callback_already_successful(self):
1326        callback_result = None
1327        def fn(callback_future):
1328            nonlocal callback_result
1329            callback_result = callback_future.result()
1330
1331        f = Future()
1332        f.set_result(5)
1333        f.add_done_callback(fn)
1334        self.assertEqual(5, callback_result)
1335
1336    def test_done_callback_already_failed(self):
1337        callback_exception = None
1338        def fn(callback_future):
1339            nonlocal callback_exception
1340            callback_exception = callback_future.exception()
1341
1342        f = Future()
1343        f.set_exception(Exception('test'))
1344        f.add_done_callback(fn)
1345        self.assertEqual(('test',), callback_exception.args)
1346
1347    def test_done_callback_already_cancelled(self):
1348        was_cancelled = None
1349        def fn(callback_future):
1350            nonlocal was_cancelled
1351            was_cancelled = callback_future.cancelled()
1352
1353        f = Future()
1354        self.assertTrue(f.cancel())
1355        f.add_done_callback(fn)
1356        self.assertTrue(was_cancelled)
1357
1358    def test_done_callback_raises_already_succeeded(self):
1359        with support.captured_stderr() as stderr:
1360            def raising_fn(callback_future):
1361                raise Exception('doh!')
1362
1363            f = Future()
1364
1365            # Set the result first to simulate a future that runs instantly,
1366            # effectively allowing the callback to be run immediately.
1367            f.set_result(5)
1368            f.add_done_callback(raising_fn)
1369
1370            self.assertIn('exception calling callback for', stderr.getvalue())
1371            self.assertIn('doh!', stderr.getvalue())
1372
1373
1374    def test_repr(self):
1375        self.assertRegex(repr(PENDING_FUTURE),
1376                         '<Future at 0x[0-9a-f]+ state=pending>')
1377        self.assertRegex(repr(RUNNING_FUTURE),
1378                         '<Future at 0x[0-9a-f]+ state=running>')
1379        self.assertRegex(repr(CANCELLED_FUTURE),
1380                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1381        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1382                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1383        self.assertRegex(
1384                repr(EXCEPTION_FUTURE),
1385                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
1386        self.assertRegex(
1387                repr(SUCCESSFUL_FUTURE),
1388                '<Future at 0x[0-9a-f]+ state=finished returned int>')
1389
1390
1391    def test_cancel(self):
1392        f1 = create_future(state=PENDING)
1393        f2 = create_future(state=RUNNING)
1394        f3 = create_future(state=CANCELLED)
1395        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
1396        f5 = create_future(state=FINISHED, exception=OSError())
1397        f6 = create_future(state=FINISHED, result=5)
1398
1399        self.assertTrue(f1.cancel())
1400        self.assertEqual(f1._state, CANCELLED)
1401
1402        self.assertFalse(f2.cancel())
1403        self.assertEqual(f2._state, RUNNING)
1404
1405        self.assertTrue(f3.cancel())
1406        self.assertEqual(f3._state, CANCELLED)
1407
1408        self.assertTrue(f4.cancel())
1409        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1410
1411        self.assertFalse(f5.cancel())
1412        self.assertEqual(f5._state, FINISHED)
1413
1414        self.assertFalse(f6.cancel())
1415        self.assertEqual(f6._state, FINISHED)
1416
1417    def test_cancelled(self):
1418        self.assertFalse(PENDING_FUTURE.cancelled())
1419        self.assertFalse(RUNNING_FUTURE.cancelled())
1420        self.assertTrue(CANCELLED_FUTURE.cancelled())
1421        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1422        self.assertFalse(EXCEPTION_FUTURE.cancelled())
1423        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1424
1425    def test_done(self):
1426        self.assertFalse(PENDING_FUTURE.done())
1427        self.assertFalse(RUNNING_FUTURE.done())
1428        self.assertTrue(CANCELLED_FUTURE.done())
1429        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1430        self.assertTrue(EXCEPTION_FUTURE.done())
1431        self.assertTrue(SUCCESSFUL_FUTURE.done())
1432
1433    def test_running(self):
1434        self.assertFalse(PENDING_FUTURE.running())
1435        self.assertTrue(RUNNING_FUTURE.running())
1436        self.assertFalse(CANCELLED_FUTURE.running())
1437        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1438        self.assertFalse(EXCEPTION_FUTURE.running())
1439        self.assertFalse(SUCCESSFUL_FUTURE.running())
1440
1441    def test_result_with_timeout(self):
1442        self.assertRaises(futures.TimeoutError,
1443                          PENDING_FUTURE.result, timeout=0)
1444        self.assertRaises(futures.TimeoutError,
1445                          RUNNING_FUTURE.result, timeout=0)
1446        self.assertRaises(futures.CancelledError,
1447                          CANCELLED_FUTURE.result, timeout=0)
1448        self.assertRaises(futures.CancelledError,
1449                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
1450        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1451        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1452
1453    def test_result_with_success(self):
1454        # TODO(brian@sweetapp.com): This test is timing dependent.
1455        def notification():
1456            # Wait until the main thread is waiting for the result.
1457            time.sleep(1)
1458            f1.set_result(42)
1459
1460        f1 = create_future(state=PENDING)
1461        t = threading.Thread(target=notification)
1462        t.start()
1463
1464        self.assertEqual(f1.result(timeout=5), 42)
1465        t.join()
1466
1467    def test_result_with_cancel(self):
1468        # TODO(brian@sweetapp.com): This test is timing dependent.
1469        def notification():
1470            # Wait until the main thread is waiting for the result.
1471            time.sleep(1)
1472            f1.cancel()
1473
1474        f1 = create_future(state=PENDING)
1475        t = threading.Thread(target=notification)
1476        t.start()
1477
1478        self.assertRaises(futures.CancelledError,
1479                          f1.result, timeout=support.SHORT_TIMEOUT)
1480        t.join()
1481
1482    def test_exception_with_timeout(self):
1483        self.assertRaises(futures.TimeoutError,
1484                          PENDING_FUTURE.exception, timeout=0)
1485        self.assertRaises(futures.TimeoutError,
1486                          RUNNING_FUTURE.exception, timeout=0)
1487        self.assertRaises(futures.CancelledError,
1488                          CANCELLED_FUTURE.exception, timeout=0)
1489        self.assertRaises(futures.CancelledError,
1490                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1491        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1492                                   OSError))
1493        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1494
1495    def test_exception_with_success(self):
1496        def notification():
1497            # Wait until the main thread is waiting for the exception.
1498            time.sleep(1)
1499            with f1._condition:
1500                f1._state = FINISHED
1501                f1._exception = OSError()
1502                f1._condition.notify_all()
1503
1504        f1 = create_future(state=PENDING)
1505        t = threading.Thread(target=notification)
1506        t.start()
1507
1508        self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
1509        t.join()
1510
1511    def test_multiple_set_result(self):
1512        f = create_future(state=PENDING)
1513        f.set_result(1)
1514
1515        with self.assertRaisesRegex(
1516                futures.InvalidStateError,
1517                'FINISHED: <Future at 0x[0-9a-f]+ '
1518                'state=finished returned int>'
1519        ):
1520            f.set_result(2)
1521
1522        self.assertTrue(f.done())
1523        self.assertEqual(f.result(), 1)
1524
1525    def test_multiple_set_exception(self):
1526        f = create_future(state=PENDING)
1527        e = ValueError()
1528        f.set_exception(e)
1529
1530        with self.assertRaisesRegex(
1531                futures.InvalidStateError,
1532                'FINISHED: <Future at 0x[0-9a-f]+ '
1533                'state=finished raised ValueError>'
1534        ):
1535            f.set_exception(Exception())
1536
1537        self.assertEqual(f.exception(), e)
1538
1539
1540def setUpModule():
1541    unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
1542    thread_info = threading_helper.threading_setup()
1543    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1544
1545
1546if __name__ == "__main__":
1547    unittest.main()
1548