• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import itertools
2import time
3import unittest
4import weakref
5from concurrent import futures
6from concurrent.futures._base import (
7    CANCELLED_AND_NOTIFIED, FINISHED, Future)
8
9from test import support
10
11from .util import (
12    PENDING_FUTURE, RUNNING_FUTURE,
13    CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
14    create_future, create_executor_tests, setup_module)
15
16
17def mul(x, y):
18    return x * y
19
20
21class AsCompletedTests:
22    def test_no_timeout(self):
23        future1 = self.executor.submit(mul, 2, 21)
24        future2 = self.executor.submit(mul, 7, 6)
25
26        completed = set(futures.as_completed(
27                [CANCELLED_AND_NOTIFIED_FUTURE,
28                 EXCEPTION_FUTURE,
29                 SUCCESSFUL_FUTURE,
30                 future1, future2]))
31        self.assertEqual(set(
32                [CANCELLED_AND_NOTIFIED_FUTURE,
33                 EXCEPTION_FUTURE,
34                 SUCCESSFUL_FUTURE,
35                 future1, future2]),
36                completed)
37
38    def test_future_times_out(self):
39        """Test ``futures.as_completed`` timing out before
40        completing it's final future."""
41        already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
42                             EXCEPTION_FUTURE,
43                             SUCCESSFUL_FUTURE}
44
45        # Windows clock resolution is around 15.6 ms
46        short_timeout = 0.100
47        for timeout in (0, short_timeout):
48            with self.subTest(timeout):
49
50                completed_futures = set()
51                future = self.executor.submit(time.sleep, short_timeout * 10)
52
53                try:
54                    for f in futures.as_completed(
55                        already_completed | {future},
56                        timeout
57                    ):
58                        completed_futures.add(f)
59                except futures.TimeoutError:
60                    pass
61
62                # Check that ``future`` wasn't completed.
63                self.assertEqual(completed_futures, already_completed)
64
65    def test_duplicate_futures(self):
66        # Issue 20367. Duplicate futures should not raise exceptions or give
67        # duplicate responses.
68        # Issue #31641: accept arbitrary iterables.
69        future1 = self.executor.submit(time.sleep, 2)
70        completed = [
71            f for f in futures.as_completed(itertools.repeat(future1, 3))
72        ]
73        self.assertEqual(len(completed), 1)
74
75    def test_free_reference_yielded_future(self):
76        # Issue #14406: Generator should not keep references
77        # to finished futures.
78        futures_list = [Future() for _ in range(8)]
79        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
80        futures_list.append(create_future(state=FINISHED, result=42))
81
82        with self.assertRaises(futures.TimeoutError):
83            for future in futures.as_completed(futures_list, timeout=0):
84                futures_list.remove(future)
85                wr = weakref.ref(future)
86                del future
87                support.gc_collect()  # For PyPy or other GCs.
88                self.assertIsNone(wr())
89
90        futures_list[0].set_result("test")
91        for future in futures.as_completed(futures_list):
92            futures_list.remove(future)
93            wr = weakref.ref(future)
94            del future
95            support.gc_collect()  # For PyPy or other GCs.
96            self.assertIsNone(wr())
97            if futures_list:
98                futures_list[0].set_result("test")
99
100    def test_correct_timeout_exception_msg(self):
101        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
102                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
103
104        with self.assertRaises(futures.TimeoutError) as cm:
105            list(futures.as_completed(futures_list, timeout=0))
106
107        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
108
109
110create_executor_tests(globals(), AsCompletedTests)
111
112
113def setUpModule():
114    setup_module()
115
116
117if __name__ == "__main__":
118    unittest.main()
119