1import itertools 2import time 3import unittest 4import weakref 5from concurrent import futures 6from concurrent.futures._base import ( 7 CANCELLED_AND_NOTIFIED, FINISHED, Future) 8 9from test import support 10 11from .util import ( 12 PENDING_FUTURE, RUNNING_FUTURE, 13 CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, 14 create_future, create_executor_tests, setup_module) 15 16 17def mul(x, y): 18 return x * y 19 20 21class AsCompletedTests: 22 def test_no_timeout(self): 23 future1 = self.executor.submit(mul, 2, 21) 24 future2 = self.executor.submit(mul, 7, 6) 25 26 completed = set(futures.as_completed( 27 [CANCELLED_AND_NOTIFIED_FUTURE, 28 EXCEPTION_FUTURE, 29 SUCCESSFUL_FUTURE, 30 future1, future2])) 31 self.assertEqual(set( 32 [CANCELLED_AND_NOTIFIED_FUTURE, 33 EXCEPTION_FUTURE, 34 SUCCESSFUL_FUTURE, 35 future1, future2]), 36 completed) 37 38 def test_future_times_out(self): 39 """Test ``futures.as_completed`` timing out before 40 completing it's final future.""" 41 already_completed = {CANCELLED_AND_NOTIFIED_FUTURE, 42 EXCEPTION_FUTURE, 43 SUCCESSFUL_FUTURE} 44 45 # Windows clock resolution is around 15.6 ms 46 short_timeout = 0.100 47 for timeout in (0, short_timeout): 48 with self.subTest(timeout): 49 50 completed_futures = set() 51 future = self.executor.submit(time.sleep, short_timeout * 10) 52 53 try: 54 for f in futures.as_completed( 55 already_completed | {future}, 56 timeout 57 ): 58 completed_futures.add(f) 59 except futures.TimeoutError: 60 pass 61 62 # Check that ``future`` wasn't completed. 63 self.assertEqual(completed_futures, already_completed) 64 65 def test_duplicate_futures(self): 66 # Issue 20367. Duplicate futures should not raise exceptions or give 67 # duplicate responses. 68 # Issue #31641: accept arbitrary iterables. 69 future1 = self.executor.submit(time.sleep, 2) 70 completed = [ 71 f for f in futures.as_completed(itertools.repeat(future1, 3)) 72 ] 73 self.assertEqual(len(completed), 1) 74 75 def test_free_reference_yielded_future(self): 76 # Issue #14406: Generator should not keep references 77 # to finished futures. 78 futures_list = [Future() for _ in range(8)] 79 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 80 futures_list.append(create_future(state=FINISHED, result=42)) 81 82 with self.assertRaises(futures.TimeoutError): 83 for future in futures.as_completed(futures_list, timeout=0): 84 futures_list.remove(future) 85 wr = weakref.ref(future) 86 del future 87 support.gc_collect() # For PyPy or other GCs. 88 self.assertIsNone(wr()) 89 90 futures_list[0].set_result("test") 91 for future in futures.as_completed(futures_list): 92 futures_list.remove(future) 93 wr = weakref.ref(future) 94 del future 95 support.gc_collect() # For PyPy or other GCs. 96 self.assertIsNone(wr()) 97 if futures_list: 98 futures_list[0].set_result("test") 99 100 def test_correct_timeout_exception_msg(self): 101 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 102 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 103 104 with self.assertRaises(futures.TimeoutError) as cm: 105 list(futures.as_completed(futures_list, timeout=0)) 106 107 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 108 109 110create_executor_tests(globals(), AsCompletedTests) 111 112 113def setUpModule(): 114 setup_module() 115 116 117if __name__ == "__main__": 118 unittest.main() 119