• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import subprocess
3import sys
4import threading
5import functools
6import contextlib
7import logging
8import re
9import time
10import gc
11import traceback
12from StringIO import StringIO
13from test import test_support
14
15from concurrent import futures
16from concurrent.futures._base import (
17    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
18from concurrent.futures.thread import cpu_count
19
20try:
21    import unittest2 as unittest
22except ImportError:
23    import unittest
24
25
26def reap_threads(func):
27    """Use this function when threads are being used.  This will
28    ensure that the threads are cleaned up even when the test fails.
29    If threading is unavailable this function does nothing.
30    """
31    @functools.wraps(func)
32    def decorator(*args):
33        key = test_support.threading_setup()
34        try:
35            return func(*args)
36        finally:
37            test_support.threading_cleanup(*key)
38    return decorator
39
40
41# Executing the interpreter in a subprocess
42def _assert_python(expected_success, *args, **env_vars):
43    cmd_line = [sys.executable]
44    if not env_vars:
45        cmd_line.append('-E')
46    # Need to preserve the original environment, for in-place testing of
47    # shared library builds.
48    env = os.environ.copy()
49    # But a special flag that can be set to override -- in this case, the
50    # caller is responsible to pass the full environment.
51    if env_vars.pop('__cleanenv', None):
52        env = {}
53    env.update(env_vars)
54    cmd_line.extend(args)
55    p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
56                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
57                         env=env)
58    try:
59        out, err = p.communicate()
60    finally:
61        subprocess._cleanup()
62        p.stdout.close()
63        p.stderr.close()
64    rc = p.returncode
65    err = strip_python_stderr(err)
66    if (rc and expected_success) or (not rc and not expected_success):
67        raise AssertionError(
68            "Process return code is %d, "
69            "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
70    return rc, out, err
71
72
73def assert_python_ok(*args, **env_vars):
74    """
75    Assert that running the interpreter with `args` and optional environment
76    variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
77    """
78    return _assert_python(True, *args, **env_vars)
79
80
81def strip_python_stderr(stderr):
82    """Strip the stderr of a Python process from potential debug output
83    emitted by the interpreter.
84
85    This will typically be run on the result of the communicate() method
86    of a subprocess.Popen object.
87    """
88    stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
89    return stderr
90
91
92@contextlib.contextmanager
93def captured_stderr():
94    """Return a context manager used by captured_stdout/stdin/stderr
95    that temporarily replaces the sys stream *stream_name* with a StringIO."""
96    logging_stream = StringIO()
97    handler = logging.StreamHandler(logging_stream)
98    logging.root.addHandler(handler)
99
100    try:
101        yield logging_stream
102    finally:
103        logging.root.removeHandler(handler)
104
105
106def create_future(state=PENDING, exception=None, result=None):
107    f = Future()
108    f._state = state
109    f._exception = exception
110    f._result = result
111    return f
112
113
114PENDING_FUTURE = create_future(state=PENDING)
115RUNNING_FUTURE = create_future(state=RUNNING)
116CANCELLED_FUTURE = create_future(state=CANCELLED)
117CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
118EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
119SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
120
121
122def mul(x, y):
123    return x * y
124
125
126def sleep_and_raise(t):
127    time.sleep(t)
128    raise Exception('this is an exception')
129
130def sleep_and_print(t, msg):
131    time.sleep(t)
132    print(msg)
133    sys.stdout.flush()
134
135
136class ExecutorMixin:
137    worker_count = 5
138
139    def setUp(self):
140        self.t1 = time.time()
141        try:
142            self.executor = self.executor_type(max_workers=self.worker_count)
143        except NotImplementedError:
144            e = sys.exc_info()[1]
145            self.skipTest(str(e))
146        self._prime_executor()
147
148    def tearDown(self):
149        self.executor.shutdown(wait=True)
150        dt = time.time() - self.t1
151        if test_support.verbose:
152            print("%.2fs" % dt)
153        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
154
155    def _prime_executor(self):
156        # Make sure that the executor is ready to do work before running the
157        # tests. This should reduce the probability of timeouts in the tests.
158        futures = [self.executor.submit(time.sleep, 0.1)
159                   for _ in range(self.worker_count)]
160
161        for f in futures:
162            f.result()
163
164
165class ThreadPoolMixin(ExecutorMixin):
166    executor_type = futures.ThreadPoolExecutor
167
168
169class ProcessPoolMixin(ExecutorMixin):
170    executor_type = futures.ProcessPoolExecutor
171
172
173class ExecutorShutdownTest(unittest.TestCase):
174    def test_run_after_shutdown(self):
175        self.executor.shutdown()
176        self.assertRaises(RuntimeError,
177                          self.executor.submit,
178                          pow, 2, 5)
179
180    def test_interpreter_shutdown(self):
181        # Test the atexit hook for shutdown of worker threads and processes
182        rc, out, err = assert_python_ok('-c', """if 1:
183            from concurrent.futures import %s
184            from time import sleep
185            from test_futures import sleep_and_print
186            t = %s(5)
187            t.submit(sleep_and_print, 1.0, "apple")
188            """ % (self.executor_type.__name__, self.executor_type.__name__))
189        # Errors in atexit hooks don't change the process exit code, check
190        # stderr manually.
191        self.assertFalse(err)
192        self.assertEqual(out.strip(), "apple".encode())
193
194    def test_hang_issue12364(self):
195        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
196        self.executor.shutdown()
197        for f in fs:
198            f.result()
199
200
201class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
202    def _prime_executor(self):
203        pass
204
205    def test_threads_terminate(self):
206        self.executor.submit(mul, 21, 2)
207        self.executor.submit(mul, 6, 7)
208        self.executor.submit(mul, 3, 14)
209        self.assertEqual(len(self.executor._threads), 3)
210        self.executor.shutdown()
211        for t in self.executor._threads:
212            t.join()
213
214    def test_context_manager_shutdown(self):
215        with futures.ThreadPoolExecutor(max_workers=5) as e:
216            executor = e
217            self.assertEqual(list(e.map(abs, range(-5, 5))),
218                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
219
220        for t in executor._threads:
221            t.join()
222
223    def test_del_shutdown(self):
224        executor = futures.ThreadPoolExecutor(max_workers=5)
225        executor.map(abs, range(-5, 5))
226        threads = executor._threads
227        del executor
228        gc.collect()
229
230        for t in threads:
231            t.join()
232
233    def test_thread_names_assigned(self):
234        executor = futures.ThreadPoolExecutor(
235            max_workers=5, thread_name_prefix='SpecialPool')
236        executor.map(abs, range(-5, 5))
237        threads = executor._threads
238        del executor
239        gc.collect()
240
241        for t in threads:
242            self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$')
243            t.join()
244
245    def test_thread_names_default(self):
246        executor = futures.ThreadPoolExecutor(max_workers=5)
247        executor.map(abs, range(-5, 5))
248        threads = executor._threads
249        del executor
250        gc.collect()
251
252        for t in threads:
253            # Ensure that our default name is reasonably sane and unique when
254            # no thread_name_prefix was supplied.
255            self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
256            t.join()
257
258
259class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
260    def _prime_executor(self):
261        pass
262
263    def test_processes_terminate(self):
264        self.executor.submit(mul, 21, 2)
265        self.executor.submit(mul, 6, 7)
266        self.executor.submit(mul, 3, 14)
267        self.assertEqual(len(self.executor._processes), 5)
268        processes = self.executor._processes
269        self.executor.shutdown()
270
271        for p in processes:
272            p.join()
273
274    def test_context_manager_shutdown(self):
275        with futures.ProcessPoolExecutor(max_workers=5) as e:
276            processes = e._processes
277            self.assertEqual(list(e.map(abs, range(-5, 5))),
278                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
279
280        for p in processes:
281            p.join()
282
283    def test_del_shutdown(self):
284        executor = futures.ProcessPoolExecutor(max_workers=5)
285        list(executor.map(abs, range(-5, 5)))
286        queue_management_thread = executor._queue_management_thread
287        processes = executor._processes
288        del executor
289        gc.collect()
290
291        queue_management_thread.join()
292        for p in processes:
293            p.join()
294
295
296class WaitTests(unittest.TestCase):
297
298    def test_first_completed(self):
299        future1 = self.executor.submit(mul, 21, 2)
300        future2 = self.executor.submit(time.sleep, 1.5)
301
302        done, not_done = futures.wait(
303                [CANCELLED_FUTURE, future1, future2],
304                 return_when=futures.FIRST_COMPLETED)
305
306        self.assertEqual(set([future1]), done)
307        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
308
309    def test_first_completed_some_already_completed(self):
310        future1 = self.executor.submit(time.sleep, 1.5)
311
312        finished, pending = futures.wait(
313                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
314                 return_when=futures.FIRST_COMPLETED)
315
316        self.assertEqual(
317                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
318                finished)
319        self.assertEqual(set([future1]), pending)
320
321    def test_first_exception(self):
322        future1 = self.executor.submit(mul, 2, 21)
323        future2 = self.executor.submit(sleep_and_raise, 1.5)
324        future3 = self.executor.submit(time.sleep, 3)
325
326        finished, pending = futures.wait(
327                [future1, future2, future3],
328                return_when=futures.FIRST_EXCEPTION)
329
330        self.assertEqual(set([future1, future2]), finished)
331        self.assertEqual(set([future3]), pending)
332
333    def test_first_exception_some_already_complete(self):
334        future1 = self.executor.submit(divmod, 21, 0)
335        future2 = self.executor.submit(time.sleep, 1.5)
336
337        finished, pending = futures.wait(
338                [SUCCESSFUL_FUTURE,
339                 CANCELLED_FUTURE,
340                 CANCELLED_AND_NOTIFIED_FUTURE,
341                 future1, future2],
342                return_when=futures.FIRST_EXCEPTION)
343
344        self.assertEqual(set([SUCCESSFUL_FUTURE,
345                              CANCELLED_AND_NOTIFIED_FUTURE,
346                              future1]), finished)
347        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
348
349    def test_first_exception_one_already_failed(self):
350        future1 = self.executor.submit(time.sleep, 2)
351
352        finished, pending = futures.wait(
353                 [EXCEPTION_FUTURE, future1],
354                 return_when=futures.FIRST_EXCEPTION)
355
356        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
357        self.assertEqual(set([future1]), pending)
358
359    def test_all_completed(self):
360        future1 = self.executor.submit(divmod, 2, 0)
361        future2 = self.executor.submit(mul, 2, 21)
362
363        finished, pending = futures.wait(
364                [SUCCESSFUL_FUTURE,
365                 CANCELLED_AND_NOTIFIED_FUTURE,
366                 EXCEPTION_FUTURE,
367                 future1,
368                 future2],
369                return_when=futures.ALL_COMPLETED)
370
371        self.assertEqual(set([SUCCESSFUL_FUTURE,
372                              CANCELLED_AND_NOTIFIED_FUTURE,
373                              EXCEPTION_FUTURE,
374                              future1,
375                              future2]), finished)
376        self.assertEqual(set(), pending)
377
378    def test_timeout(self):
379        future1 = self.executor.submit(mul, 6, 7)
380        future2 = self.executor.submit(time.sleep, 3)
381
382        finished, pending = futures.wait(
383                [CANCELLED_AND_NOTIFIED_FUTURE,
384                 EXCEPTION_FUTURE,
385                 SUCCESSFUL_FUTURE,
386                 future1, future2],
387                timeout=1.5,
388                return_when=futures.ALL_COMPLETED)
389
390        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
391                              EXCEPTION_FUTURE,
392                              SUCCESSFUL_FUTURE,
393                              future1]), finished)
394        self.assertEqual(set([future2]), pending)
395
396
397class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
398
399    def test_pending_calls_race(self):
400        # Issue #14406: multi-threaded race condition when waiting on all
401        # futures.
402        event = threading.Event()
403        def future_func():
404            event.wait()
405        oldswitchinterval = sys.getcheckinterval()
406        sys.setcheckinterval(1)
407        try:
408            fs = set(self.executor.submit(future_func) for i in range(100))
409            event.set()
410            futures.wait(fs, return_when=futures.ALL_COMPLETED)
411        finally:
412            sys.setcheckinterval(oldswitchinterval)
413
414
415class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
416    pass
417
418
419class AsCompletedTests(unittest.TestCase):
420    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
421    def test_no_timeout(self):
422        future1 = self.executor.submit(mul, 2, 21)
423        future2 = self.executor.submit(mul, 7, 6)
424
425        completed = set(futures.as_completed(
426                [CANCELLED_AND_NOTIFIED_FUTURE,
427                 EXCEPTION_FUTURE,
428                 SUCCESSFUL_FUTURE,
429                 future1, future2]))
430        self.assertEqual(set(
431                [CANCELLED_AND_NOTIFIED_FUTURE,
432                 EXCEPTION_FUTURE,
433                 SUCCESSFUL_FUTURE,
434                 future1, future2]),
435                completed)
436
437    def test_zero_timeout(self):
438        future1 = self.executor.submit(time.sleep, 2)
439        completed_futures = set()
440        try:
441            for future in futures.as_completed(
442                    [CANCELLED_AND_NOTIFIED_FUTURE,
443                     EXCEPTION_FUTURE,
444                     SUCCESSFUL_FUTURE,
445                     future1],
446                    timeout=0):
447                completed_futures.add(future)
448        except futures.TimeoutError:
449            pass
450
451        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
452                              EXCEPTION_FUTURE,
453                              SUCCESSFUL_FUTURE]),
454                         completed_futures)
455
456    def test_duplicate_futures(self):
457        # Issue 20367. Duplicate futures should not raise exceptions or give
458        # duplicate responses.
459        future1 = self.executor.submit(time.sleep, 2)
460        completed = [f for f in futures.as_completed([future1,future1])]
461        self.assertEqual(len(completed), 1)
462
463
464class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
465    pass
466
467
468class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
469    pass
470
471
472class ExecutorTest(unittest.TestCase):
473    # Executor.shutdown() and context manager usage is tested by
474    # ExecutorShutdownTest.
475    def test_submit(self):
476        future = self.executor.submit(pow, 2, 8)
477        self.assertEqual(256, future.result())
478
479    def test_submit_keyword(self):
480        future = self.executor.submit(mul, 2, y=8)
481        self.assertEqual(16, future.result())
482
483    def test_map(self):
484        self.assertEqual(
485                list(self.executor.map(pow, range(10), range(10))),
486                list(map(pow, range(10), range(10))))
487
488    def test_map_exception(self):
489        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
490        self.assertEqual(next(i), (0, 1))
491        self.assertEqual(next(i), (0, 1))
492        self.assertRaises(ZeroDivisionError, next, i)
493
494    def test_map_timeout(self):
495        results = []
496        try:
497            for i in self.executor.map(time.sleep,
498                                       [0, 0, 3],
499                                       timeout=1.5):
500                results.append(i)
501        except futures.TimeoutError:
502            pass
503        else:
504            self.fail('expected TimeoutError')
505
506        self.assertEqual([None, None], results)
507
508    def test_max_workers_negative(self):
509        for number in (0, -1):
510            with self.assertRaises(ValueError) as cm:
511                self.executor_type(max_workers=number)
512
513            assert str(cm.exception) == "max_workers must be greater than 0"
514
515
516class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
517    def test_map_submits_without_iteration(self):
518        """Tests verifying issue 11777."""
519        finished = []
520        def record_finished(n):
521            finished.append(n)
522
523        self.executor.map(record_finished, range(10))
524        self.executor.shutdown(wait=True)
525        self.assertEqual(len(finished), 10)
526
527    def test_default_workers(self):
528        executor = self.executor_type()
529        self.assertEqual(executor._max_workers,
530                         (cpu_count() or 1) * 5)
531
532
533class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
534    pass
535
536
537class FutureTests(unittest.TestCase):
538    def test_done_callback_with_result(self):
539        callback_result = [None]
540        def fn(callback_future):
541            callback_result[0] = callback_future.result()
542
543        f = Future()
544        f.add_done_callback(fn)
545        f.set_result(5)
546        self.assertEqual(5, callback_result[0])
547
548    def test_done_callback_with_exception(self):
549        callback_exception = [None]
550        def fn(callback_future):
551            callback_exception[0] = callback_future.exception()
552
553        f = Future()
554        f.add_done_callback(fn)
555        f.set_exception(Exception('test'))
556        self.assertEqual(('test',), callback_exception[0].args)
557
558    def test_done_callback_with_cancel(self):
559        was_cancelled = [None]
560        def fn(callback_future):
561            was_cancelled[0] = callback_future.cancelled()
562
563        f = Future()
564        f.add_done_callback(fn)
565        self.assertTrue(f.cancel())
566        self.assertTrue(was_cancelled[0])
567
568    def test_done_callback_raises(self):
569        with captured_stderr() as stderr:
570            raising_was_called = [False]
571            raising_old_style_was_called = [False]
572            fn_was_called = [False]
573
574            def raising_fn(callback_future):
575                raising_was_called[0] = True
576                raise Exception('doh!')
577
578            def raising_old_style_fn(callback_future):
579                raising_old_style_was_called[0] = True
580                class OldStyle:  # Does not inherit from object
581                   def __str__(self):
582                       return 'doh!'
583                raise OldStyle()
584
585            def fn(callback_future):
586                fn_was_called[0] = True
587
588            f = Future()
589            f.add_done_callback(raising_fn)
590            f.add_done_callback(raising_old_style_fn)
591            f.add_done_callback(fn)
592            f.set_result(5)
593            self.assertTrue(raising_was_called)
594            self.assertTrue(raising_old_style_was_called)
595            self.assertTrue(fn_was_called)
596            self.assertIn('Exception: doh!', stderr.getvalue())
597            self.assertIn('OldStyle: doh!', stderr.getvalue())
598
599    def test_done_callback_already_successful(self):
600        callback_result = [None]
601        def fn(callback_future):
602            callback_result[0] = callback_future.result()
603
604        f = Future()
605        f.set_result(5)
606        f.add_done_callback(fn)
607        self.assertEqual(5, callback_result[0])
608
609    def test_done_callback_already_failed(self):
610        callback_exception = [None]
611        def fn(callback_future):
612            callback_exception[0] = callback_future.exception()
613
614        f = Future()
615        f.set_exception(Exception('test'))
616        f.add_done_callback(fn)
617        self.assertEqual(('test',), callback_exception[0].args)
618
619    def test_done_callback_already_cancelled(self):
620        was_cancelled = [None]
621        def fn(callback_future):
622            was_cancelled[0] = callback_future.cancelled()
623
624        f = Future()
625        self.assertTrue(f.cancel())
626        f.add_done_callback(fn)
627        self.assertTrue(was_cancelled[0])
628
629    def test_repr(self):
630        self.assertRegexpMatches(repr(PENDING_FUTURE),
631                                 '<Future at 0x[0-9a-f]+L? state=pending>')
632        self.assertRegexpMatches(repr(RUNNING_FUTURE),
633                                 '<Future at 0x[0-9a-f]+L? state=running>')
634        self.assertRegexpMatches(repr(CANCELLED_FUTURE),
635                                 '<Future at 0x[0-9a-f]+L? state=cancelled>')
636        self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
637                                 '<Future at 0x[0-9a-f]+L? state=cancelled>')
638        self.assertRegexpMatches(
639                repr(EXCEPTION_FUTURE),
640                '<Future at 0x[0-9a-f]+L? state=finished raised IOError>')
641        self.assertRegexpMatches(
642                repr(SUCCESSFUL_FUTURE),
643                '<Future at 0x[0-9a-f]+L? state=finished returned int>')
644
645    def test_cancel(self):
646        f1 = create_future(state=PENDING)
647        f2 = create_future(state=RUNNING)
648        f3 = create_future(state=CANCELLED)
649        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
650        f5 = create_future(state=FINISHED, exception=IOError())
651        f6 = create_future(state=FINISHED, result=5)
652
653        self.assertTrue(f1.cancel())
654        self.assertEqual(f1._state, CANCELLED)
655
656        self.assertFalse(f2.cancel())
657        self.assertEqual(f2._state, RUNNING)
658
659        self.assertTrue(f3.cancel())
660        self.assertEqual(f3._state, CANCELLED)
661
662        self.assertTrue(f4.cancel())
663        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
664
665        self.assertFalse(f5.cancel())
666        self.assertEqual(f5._state, FINISHED)
667
668        self.assertFalse(f6.cancel())
669        self.assertEqual(f6._state, FINISHED)
670
671    def test_cancelled(self):
672        self.assertFalse(PENDING_FUTURE.cancelled())
673        self.assertFalse(RUNNING_FUTURE.cancelled())
674        self.assertTrue(CANCELLED_FUTURE.cancelled())
675        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
676        self.assertFalse(EXCEPTION_FUTURE.cancelled())
677        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
678
679    def test_done(self):
680        self.assertFalse(PENDING_FUTURE.done())
681        self.assertFalse(RUNNING_FUTURE.done())
682        self.assertTrue(CANCELLED_FUTURE.done())
683        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
684        self.assertTrue(EXCEPTION_FUTURE.done())
685        self.assertTrue(SUCCESSFUL_FUTURE.done())
686
687    def test_running(self):
688        self.assertFalse(PENDING_FUTURE.running())
689        self.assertTrue(RUNNING_FUTURE.running())
690        self.assertFalse(CANCELLED_FUTURE.running())
691        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
692        self.assertFalse(EXCEPTION_FUTURE.running())
693        self.assertFalse(SUCCESSFUL_FUTURE.running())
694
695    def test_result_with_timeout(self):
696        self.assertRaises(futures.TimeoutError,
697                          PENDING_FUTURE.result, timeout=0)
698        self.assertRaises(futures.TimeoutError,
699                          RUNNING_FUTURE.result, timeout=0)
700        self.assertRaises(futures.CancelledError,
701                          CANCELLED_FUTURE.result, timeout=0)
702        self.assertRaises(futures.CancelledError,
703                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
704        self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
705        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
706
707    def test_result_with_success(self):
708        # TODO(brian@sweetapp.com): This test is timing dependant.
709        def notification():
710            # Wait until the main thread is waiting for the result.
711            time.sleep(1)
712            f1.set_result(42)
713
714        f1 = create_future(state=PENDING)
715        t = threading.Thread(target=notification)
716        t.start()
717
718        self.assertEqual(f1.result(timeout=5), 42)
719
720    def test_result_with_cancel(self):
721        # TODO(brian@sweetapp.com): This test is timing dependant.
722        def notification():
723            # Wait until the main thread is waiting for the result.
724            time.sleep(1)
725            f1.cancel()
726
727        f1 = create_future(state=PENDING)
728        t = threading.Thread(target=notification)
729        t.start()
730
731        self.assertRaises(futures.CancelledError, f1.result, timeout=5)
732
733    def test_exception_with_timeout(self):
734        self.assertRaises(futures.TimeoutError,
735                          PENDING_FUTURE.exception, timeout=0)
736        self.assertRaises(futures.TimeoutError,
737                          RUNNING_FUTURE.exception, timeout=0)
738        self.assertRaises(futures.CancelledError,
739                          CANCELLED_FUTURE.exception, timeout=0)
740        self.assertRaises(futures.CancelledError,
741                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
742        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
743                                   IOError))
744        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
745
746    def test_exception_with_success(self):
747        def notification():
748            # Wait until the main thread is waiting for the exception.
749            time.sleep(1)
750            with f1._condition:
751                f1._state = FINISHED
752                f1._exception = IOError()
753                f1._condition.notify_all()
754
755        f1 = create_future(state=PENDING)
756        t = threading.Thread(target=notification)
757        t.start()
758
759        self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
760
761    def test_old_style_exception(self):
762        class OldStyle:  # Does not inherit from object
763            def  __str__(self):
764                return 'doh!'
765        callback_exc_info = [None]
766        def fn(callback_future):
767            callback_exc_info[0] = callback_future.exception_info()
768        f = Future()
769        f.add_done_callback(fn)
770        try:
771            raise OldStyle()
772        except OldStyle:
773            want_exc_info = sys.exc_info()
774            f.set_exception_info(*want_exc_info[1:])
775        self.assertEqual(f.exception_info(), want_exc_info[1:])
776        self.assertEqual(callback_exc_info[0], want_exc_info[1:])
777        try:
778            f.result()
779        except OldStyle:
780            got_exc_info = sys.exc_info()
781        else:
782            self.fail('OldStyle exception not raised')
783        self.assertEqual(got_exc_info[:2], want_exc_info[:2])
784        got_tb = traceback.extract_tb(got_exc_info[2])
785        want_tb = traceback.extract_tb(want_exc_info[2])
786        self.assertEqual(got_tb[-len(want_tb):], want_tb)
787
788@reap_threads
789def test_main():
790    try:
791        test_support.run_unittest(ProcessPoolExecutorTest,
792                                  ThreadPoolExecutorTest,
793                                  ProcessPoolWaitTests,
794                                  ThreadPoolWaitTests,
795                                  ProcessPoolAsCompletedTests,
796                                  ThreadPoolAsCompletedTests,
797                                  FutureTests,
798                                  ProcessPoolShutdownTest,
799                                  ThreadPoolShutdownTest)
800    finally:
801        test_support.reap_children()
802
803if __name__ == "__main__":
804    test_main()
805