1from test import support 2 3# Skip tests if _multiprocessing wasn't built. 4support.import_module('_multiprocessing') 5# Skip tests if sem_open implementation is broken. 6support.skip_if_broken_multiprocessing_synchronize() 7 8from test.support import hashlib_helper 9from test.support.script_helper import assert_python_ok 10 11import contextlib 12import itertools 13import logging 14from logging.handlers import QueueHandler 15import os 16import queue 17import sys 18import threading 19import time 20import unittest 21import weakref 22from pickle import PicklingError 23 24from concurrent import futures 25from concurrent.futures._base import ( 26 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 27 BrokenExecutor) 28from concurrent.futures.process import BrokenProcessPool 29from multiprocessing import get_context 30 31import multiprocessing.process 32import multiprocessing.util 33 34 35def create_future(state=PENDING, exception=None, result=None): 36 f = Future() 37 f._state = state 38 f._exception = exception 39 f._result = result 40 return f 41 42 43PENDING_FUTURE = create_future(state=PENDING) 44RUNNING_FUTURE = create_future(state=RUNNING) 45CANCELLED_FUTURE = create_future(state=CANCELLED) 46CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 47EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 48SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 49 50INITIALIZER_STATUS = 'uninitialized' 51 52 53def mul(x, y): 54 return x * y 55 56def capture(*args, **kwargs): 57 return args, kwargs 58 59def sleep_and_raise(t): 60 time.sleep(t) 61 raise Exception('this is an exception') 62 63def sleep_and_print(t, msg): 64 time.sleep(t) 65 print(msg) 66 sys.stdout.flush() 67 68def init(x): 69 global INITIALIZER_STATUS 70 INITIALIZER_STATUS = x 71 72def get_init_status(): 73 return INITIALIZER_STATUS 74 75def init_fail(log_queue=None): 76 if log_queue is not None: 77 logger = logging.getLogger('concurrent.futures') 78 logger.addHandler(QueueHandler(log_queue)) 79 logger.setLevel('CRITICAL') 80 logger.propagate = False 81 time.sleep(0.1) # let some futures be scheduled 82 raise ValueError('error in initializer') 83 84 85class MyObject(object): 86 def my_method(self): 87 pass 88 89 90class EventfulGCObj(): 91 def __init__(self, mgr): 92 self.event = mgr.Event() 93 94 def __del__(self): 95 self.event.set() 96 97 98def make_dummy_object(_): 99 return MyObject() 100 101 102class BaseTestCase(unittest.TestCase): 103 def setUp(self): 104 self._thread_key = support.threading_setup() 105 106 def tearDown(self): 107 support.reap_children() 108 support.threading_cleanup(*self._thread_key) 109 110 111class ExecutorMixin: 112 worker_count = 5 113 executor_kwargs = {} 114 115 def setUp(self): 116 super().setUp() 117 118 self.t1 = time.monotonic() 119 if hasattr(self, "ctx"): 120 self.executor = self.executor_type( 121 max_workers=self.worker_count, 122 mp_context=self.get_context(), 123 **self.executor_kwargs) 124 else: 125 self.executor = self.executor_type( 126 max_workers=self.worker_count, 127 **self.executor_kwargs) 128 self._prime_executor() 129 130 def tearDown(self): 131 self.executor.shutdown(wait=True) 132 self.executor = None 133 134 dt = time.monotonic() - self.t1 135 if support.verbose: 136 print("%.2fs" % dt, end=' ') 137 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 138 139 super().tearDown() 140 141 def get_context(self): 142 return get_context(self.ctx) 143 144 def _prime_executor(self): 145 # Make sure that the executor is ready to do work before running the 146 # tests. This should reduce the probability of timeouts in the tests. 147 futures = [self.executor.submit(time.sleep, 0.1) 148 for _ in range(self.worker_count)] 149 for f in futures: 150 f.result() 151 152 153class ThreadPoolMixin(ExecutorMixin): 154 executor_type = futures.ThreadPoolExecutor 155 156 157class ProcessPoolForkMixin(ExecutorMixin): 158 executor_type = futures.ProcessPoolExecutor 159 ctx = "fork" 160 161 def get_context(self): 162 if sys.platform == "win32": 163 self.skipTest("require unix system") 164 return super().get_context() 165 166 167class ProcessPoolSpawnMixin(ExecutorMixin): 168 executor_type = futures.ProcessPoolExecutor 169 ctx = "spawn" 170 171 172class ProcessPoolForkserverMixin(ExecutorMixin): 173 executor_type = futures.ProcessPoolExecutor 174 ctx = "forkserver" 175 176 def get_context(self): 177 if sys.platform == "win32": 178 self.skipTest("require unix system") 179 return super().get_context() 180 181 182def create_executor_tests(mixin, bases=(BaseTestCase,), 183 executor_mixins=(ThreadPoolMixin, 184 ProcessPoolForkMixin, 185 ProcessPoolForkserverMixin, 186 ProcessPoolSpawnMixin)): 187 def strip_mixin(name): 188 if name.endswith(('Mixin', 'Tests')): 189 return name[:-5] 190 elif name.endswith('Test'): 191 return name[:-4] 192 else: 193 return name 194 195 for exe in executor_mixins: 196 name = ("%s%sTest" 197 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 198 cls = type(name, (mixin,) + (exe,) + bases, {}) 199 globals()[name] = cls 200 201 202class InitializerMixin(ExecutorMixin): 203 worker_count = 2 204 205 def setUp(self): 206 global INITIALIZER_STATUS 207 INITIALIZER_STATUS = 'uninitialized' 208 self.executor_kwargs = dict(initializer=init, 209 initargs=('initialized',)) 210 super().setUp() 211 212 def test_initializer(self): 213 futures = [self.executor.submit(get_init_status) 214 for _ in range(self.worker_count)] 215 216 for f in futures: 217 self.assertEqual(f.result(), 'initialized') 218 219 220class FailingInitializerMixin(ExecutorMixin): 221 worker_count = 2 222 223 def setUp(self): 224 if hasattr(self, "ctx"): 225 # Pass a queue to redirect the child's logging output 226 self.mp_context = self.get_context() 227 self.log_queue = self.mp_context.Queue() 228 self.executor_kwargs = dict(initializer=init_fail, 229 initargs=(self.log_queue,)) 230 else: 231 # In a thread pool, the child shares our logging setup 232 # (see _assert_logged()) 233 self.mp_context = None 234 self.log_queue = None 235 self.executor_kwargs = dict(initializer=init_fail) 236 super().setUp() 237 238 def test_initializer(self): 239 with self._assert_logged('ValueError: error in initializer'): 240 try: 241 future = self.executor.submit(get_init_status) 242 except BrokenExecutor: 243 # Perhaps the executor is already broken 244 pass 245 else: 246 with self.assertRaises(BrokenExecutor): 247 future.result() 248 # At some point, the executor should break 249 t1 = time.monotonic() 250 while not self.executor._broken: 251 if time.monotonic() - t1 > 5: 252 self.fail("executor not broken after 5 s.") 253 time.sleep(0.01) 254 # ... and from this point submit() is guaranteed to fail 255 with self.assertRaises(BrokenExecutor): 256 self.executor.submit(get_init_status) 257 258 def _prime_executor(self): 259 pass 260 261 @contextlib.contextmanager 262 def _assert_logged(self, msg): 263 if self.log_queue is not None: 264 yield 265 output = [] 266 try: 267 while True: 268 output.append(self.log_queue.get_nowait().getMessage()) 269 except queue.Empty: 270 pass 271 else: 272 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 273 yield 274 output = cm.output 275 self.assertTrue(any(msg in line for line in output), 276 output) 277 278 279create_executor_tests(InitializerMixin) 280create_executor_tests(FailingInitializerMixin) 281 282 283class ExecutorShutdownTest: 284 def test_run_after_shutdown(self): 285 self.executor.shutdown() 286 self.assertRaises(RuntimeError, 287 self.executor.submit, 288 pow, 2, 5) 289 290 def test_interpreter_shutdown(self): 291 # Test the atexit hook for shutdown of worker threads and processes 292 rc, out, err = assert_python_ok('-c', """if 1: 293 from concurrent.futures import {executor_type} 294 from time import sleep 295 from test.test_concurrent_futures import sleep_and_print 296 if __name__ == "__main__": 297 context = '{context}' 298 if context == "": 299 t = {executor_type}(5) 300 else: 301 from multiprocessing import get_context 302 context = get_context(context) 303 t = {executor_type}(5, mp_context=context) 304 t.submit(sleep_and_print, 1.0, "apple") 305 """.format(executor_type=self.executor_type.__name__, 306 context=getattr(self, "ctx", ""))) 307 # Errors in atexit hooks don't change the process exit code, check 308 # stderr manually. 309 self.assertFalse(err) 310 self.assertEqual(out.strip(), b"apple") 311 312 def test_submit_after_interpreter_shutdown(self): 313 # Test the atexit hook for shutdown of worker threads and processes 314 rc, out, err = assert_python_ok('-c', """if 1: 315 import atexit 316 @atexit.register 317 def run_last(): 318 try: 319 t.submit(id, None) 320 except RuntimeError: 321 print("runtime-error") 322 raise 323 from concurrent.futures import {executor_type} 324 if __name__ == "__main__": 325 context = '{context}' 326 if not context: 327 t = {executor_type}(5) 328 else: 329 from multiprocessing import get_context 330 context = get_context(context) 331 t = {executor_type}(5, mp_context=context) 332 t.submit(id, 42).result() 333 """.format(executor_type=self.executor_type.__name__, 334 context=getattr(self, "ctx", ""))) 335 # Errors in atexit hooks don't change the process exit code, check 336 # stderr manually. 337 self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) 338 self.assertEqual(out.strip(), b"runtime-error") 339 340 def test_hang_issue12364(self): 341 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 342 self.executor.shutdown() 343 for f in fs: 344 f.result() 345 346 def test_cancel_futures(self): 347 executor = self.executor_type(max_workers=3) 348 fs = [executor.submit(time.sleep, .1) for _ in range(50)] 349 executor.shutdown(cancel_futures=True) 350 # We can't guarantee the exact number of cancellations, but we can 351 # guarantee that *some* were cancelled. With setting max_workers to 3, 352 # most of the submitted futures should have been cancelled. 353 cancelled = [fut for fut in fs if fut.cancelled()] 354 self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}") 355 356 # Ensure the other futures were able to finish. 357 # Use "not fut.cancelled()" instead of "fut.done()" to include futures 358 # that may have been left in a pending state. 359 others = [fut for fut in fs if not fut.cancelled()] 360 for fut in others: 361 self.assertTrue(fut.done(), msg=f"{fut._state=}") 362 self.assertIsNone(fut.exception()) 363 364 # Similar to the number of cancelled futures, we can't guarantee the 365 # exact number that completed. But, we can guarantee that at least 366 # one finished. 367 self.assertTrue(len(others) > 0, msg=f"{len(others)=}") 368 369 def test_hang_issue39205(self): 370 """shutdown(wait=False) doesn't hang at exit with running futures. 371 372 See https://bugs.python.org/issue39205. 373 """ 374 if self.executor_type == futures.ProcessPoolExecutor: 375 raise unittest.SkipTest( 376 "Hangs due to https://bugs.python.org/issue39205") 377 378 rc, out, err = assert_python_ok('-c', """if True: 379 from concurrent.futures import {executor_type} 380 from test.test_concurrent_futures import sleep_and_print 381 if __name__ == "__main__": 382 t = {executor_type}(max_workers=3) 383 t.submit(sleep_and_print, 1.0, "apple") 384 t.shutdown(wait=False) 385 """.format(executor_type=self.executor_type.__name__)) 386 self.assertFalse(err) 387 self.assertEqual(out.strip(), b"apple") 388 389 390class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): 391 def _prime_executor(self): 392 pass 393 394 def test_threads_terminate(self): 395 def acquire_lock(lock): 396 lock.acquire() 397 398 sem = threading.Semaphore(0) 399 for i in range(3): 400 self.executor.submit(acquire_lock, sem) 401 self.assertEqual(len(self.executor._threads), 3) 402 for i in range(3): 403 sem.release() 404 self.executor.shutdown() 405 for t in self.executor._threads: 406 t.join() 407 408 def test_context_manager_shutdown(self): 409 with futures.ThreadPoolExecutor(max_workers=5) as e: 410 executor = e 411 self.assertEqual(list(e.map(abs, range(-5, 5))), 412 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 413 414 for t in executor._threads: 415 t.join() 416 417 def test_del_shutdown(self): 418 executor = futures.ThreadPoolExecutor(max_workers=5) 419 res = executor.map(abs, range(-5, 5)) 420 threads = executor._threads 421 del executor 422 423 for t in threads: 424 t.join() 425 426 # Make sure the results were all computed before the 427 # executor got shutdown. 428 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 429 430 def test_shutdown_no_wait(self): 431 # Ensure that the executor cleans up the threads when calling 432 # shutdown with wait=False 433 executor = futures.ThreadPoolExecutor(max_workers=5) 434 res = executor.map(abs, range(-5, 5)) 435 threads = executor._threads 436 executor.shutdown(wait=False) 437 for t in threads: 438 t.join() 439 440 # Make sure the results were all computed before the 441 # executor got shutdown. 442 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 443 444 445 def test_thread_names_assigned(self): 446 executor = futures.ThreadPoolExecutor( 447 max_workers=5, thread_name_prefix='SpecialPool') 448 executor.map(abs, range(-5, 5)) 449 threads = executor._threads 450 del executor 451 452 for t in threads: 453 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 454 t.join() 455 456 def test_thread_names_default(self): 457 executor = futures.ThreadPoolExecutor(max_workers=5) 458 executor.map(abs, range(-5, 5)) 459 threads = executor._threads 460 del executor 461 462 for t in threads: 463 # Ensure that our default name is reasonably sane and unique when 464 # no thread_name_prefix was supplied. 465 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 466 t.join() 467 468 def test_cancel_futures_wait_false(self): 469 # Can only be reliably tested for TPE, since PPE often hangs with 470 # `wait=False` (even without *cancel_futures*). 471 rc, out, err = assert_python_ok('-c', """if True: 472 from concurrent.futures import ThreadPoolExecutor 473 from test.test_concurrent_futures import sleep_and_print 474 if __name__ == "__main__": 475 t = ThreadPoolExecutor() 476 t.submit(sleep_and_print, .1, "apple") 477 t.shutdown(wait=False, cancel_futures=True) 478 """.format(executor_type=self.executor_type.__name__)) 479 # Errors in atexit hooks don't change the process exit code, check 480 # stderr manually. 481 self.assertFalse(err) 482 self.assertEqual(out.strip(), b"apple") 483 484 485class ProcessPoolShutdownTest(ExecutorShutdownTest): 486 def _prime_executor(self): 487 pass 488 489 def test_processes_terminate(self): 490 def acquire_lock(lock): 491 lock.acquire() 492 493 mp_context = get_context() 494 sem = mp_context.Semaphore(0) 495 for _ in range(3): 496 self.executor.submit(acquire_lock, sem) 497 self.assertEqual(len(self.executor._processes), 3) 498 for _ in range(3): 499 sem.release() 500 processes = self.executor._processes 501 self.executor.shutdown() 502 503 for p in processes.values(): 504 p.join() 505 506 def test_context_manager_shutdown(self): 507 with futures.ProcessPoolExecutor(max_workers=5) as e: 508 processes = e._processes 509 self.assertEqual(list(e.map(abs, range(-5, 5))), 510 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 511 512 for p in processes.values(): 513 p.join() 514 515 def test_del_shutdown(self): 516 executor = futures.ProcessPoolExecutor(max_workers=5) 517 res = executor.map(abs, range(-5, 5)) 518 executor_manager_thread = executor._executor_manager_thread 519 processes = executor._processes 520 call_queue = executor._call_queue 521 executor_manager_thread = executor._executor_manager_thread 522 del executor 523 524 # Make sure that all the executor resources were properly cleaned by 525 # the shutdown process 526 executor_manager_thread.join() 527 for p in processes.values(): 528 p.join() 529 call_queue.join_thread() 530 531 # Make sure the results were all computed before the 532 # executor got shutdown. 533 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 534 535 def test_shutdown_no_wait(self): 536 # Ensure that the executor cleans up the processes when calling 537 # shutdown with wait=False 538 executor = futures.ProcessPoolExecutor(max_workers=5) 539 res = executor.map(abs, range(-5, 5)) 540 processes = executor._processes 541 call_queue = executor._call_queue 542 executor_manager_thread = executor._executor_manager_thread 543 executor.shutdown(wait=False) 544 545 # Make sure that all the executor resources were properly cleaned by 546 # the shutdown process 547 executor_manager_thread.join() 548 for p in processes.values(): 549 p.join() 550 call_queue.join_thread() 551 552 # Make sure the results were all computed before the executor got 553 # shutdown. 554 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 555 556 557create_executor_tests(ProcessPoolShutdownTest, 558 executor_mixins=(ProcessPoolForkMixin, 559 ProcessPoolForkserverMixin, 560 ProcessPoolSpawnMixin)) 561 562 563class WaitTests: 564 565 def test_first_completed(self): 566 future1 = self.executor.submit(mul, 21, 2) 567 future2 = self.executor.submit(time.sleep, 1.5) 568 569 done, not_done = futures.wait( 570 [CANCELLED_FUTURE, future1, future2], 571 return_when=futures.FIRST_COMPLETED) 572 573 self.assertEqual(set([future1]), done) 574 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 575 576 def test_first_completed_some_already_completed(self): 577 future1 = self.executor.submit(time.sleep, 1.5) 578 579 finished, pending = futures.wait( 580 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 581 return_when=futures.FIRST_COMPLETED) 582 583 self.assertEqual( 584 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 585 finished) 586 self.assertEqual(set([future1]), pending) 587 588 def test_first_exception(self): 589 future1 = self.executor.submit(mul, 2, 21) 590 future2 = self.executor.submit(sleep_and_raise, 1.5) 591 future3 = self.executor.submit(time.sleep, 3) 592 593 finished, pending = futures.wait( 594 [future1, future2, future3], 595 return_when=futures.FIRST_EXCEPTION) 596 597 self.assertEqual(set([future1, future2]), finished) 598 self.assertEqual(set([future3]), pending) 599 600 def test_first_exception_some_already_complete(self): 601 future1 = self.executor.submit(divmod, 21, 0) 602 future2 = self.executor.submit(time.sleep, 1.5) 603 604 finished, pending = futures.wait( 605 [SUCCESSFUL_FUTURE, 606 CANCELLED_FUTURE, 607 CANCELLED_AND_NOTIFIED_FUTURE, 608 future1, future2], 609 return_when=futures.FIRST_EXCEPTION) 610 611 self.assertEqual(set([SUCCESSFUL_FUTURE, 612 CANCELLED_AND_NOTIFIED_FUTURE, 613 future1]), finished) 614 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 615 616 def test_first_exception_one_already_failed(self): 617 future1 = self.executor.submit(time.sleep, 2) 618 619 finished, pending = futures.wait( 620 [EXCEPTION_FUTURE, future1], 621 return_when=futures.FIRST_EXCEPTION) 622 623 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 624 self.assertEqual(set([future1]), pending) 625 626 def test_all_completed(self): 627 future1 = self.executor.submit(divmod, 2, 0) 628 future2 = self.executor.submit(mul, 2, 21) 629 630 finished, pending = futures.wait( 631 [SUCCESSFUL_FUTURE, 632 CANCELLED_AND_NOTIFIED_FUTURE, 633 EXCEPTION_FUTURE, 634 future1, 635 future2], 636 return_when=futures.ALL_COMPLETED) 637 638 self.assertEqual(set([SUCCESSFUL_FUTURE, 639 CANCELLED_AND_NOTIFIED_FUTURE, 640 EXCEPTION_FUTURE, 641 future1, 642 future2]), finished) 643 self.assertEqual(set(), pending) 644 645 def test_timeout(self): 646 future1 = self.executor.submit(mul, 6, 7) 647 future2 = self.executor.submit(time.sleep, 6) 648 649 finished, pending = futures.wait( 650 [CANCELLED_AND_NOTIFIED_FUTURE, 651 EXCEPTION_FUTURE, 652 SUCCESSFUL_FUTURE, 653 future1, future2], 654 timeout=5, 655 return_when=futures.ALL_COMPLETED) 656 657 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 658 EXCEPTION_FUTURE, 659 SUCCESSFUL_FUTURE, 660 future1]), finished) 661 self.assertEqual(set([future2]), pending) 662 663 664class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): 665 666 def test_pending_calls_race(self): 667 # Issue #14406: multi-threaded race condition when waiting on all 668 # futures. 669 event = threading.Event() 670 def future_func(): 671 event.wait() 672 oldswitchinterval = sys.getswitchinterval() 673 sys.setswitchinterval(1e-6) 674 try: 675 fs = {self.executor.submit(future_func) for i in range(100)} 676 event.set() 677 futures.wait(fs, return_when=futures.ALL_COMPLETED) 678 finally: 679 sys.setswitchinterval(oldswitchinterval) 680 681 682create_executor_tests(WaitTests, 683 executor_mixins=(ProcessPoolForkMixin, 684 ProcessPoolForkserverMixin, 685 ProcessPoolSpawnMixin)) 686 687 688class AsCompletedTests: 689 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 690 def test_no_timeout(self): 691 future1 = self.executor.submit(mul, 2, 21) 692 future2 = self.executor.submit(mul, 7, 6) 693 694 completed = set(futures.as_completed( 695 [CANCELLED_AND_NOTIFIED_FUTURE, 696 EXCEPTION_FUTURE, 697 SUCCESSFUL_FUTURE, 698 future1, future2])) 699 self.assertEqual(set( 700 [CANCELLED_AND_NOTIFIED_FUTURE, 701 EXCEPTION_FUTURE, 702 SUCCESSFUL_FUTURE, 703 future1, future2]), 704 completed) 705 706 def test_zero_timeout(self): 707 future1 = self.executor.submit(time.sleep, 2) 708 completed_futures = set() 709 try: 710 for future in futures.as_completed( 711 [CANCELLED_AND_NOTIFIED_FUTURE, 712 EXCEPTION_FUTURE, 713 SUCCESSFUL_FUTURE, 714 future1], 715 timeout=0): 716 completed_futures.add(future) 717 except futures.TimeoutError: 718 pass 719 720 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 721 EXCEPTION_FUTURE, 722 SUCCESSFUL_FUTURE]), 723 completed_futures) 724 725 def test_duplicate_futures(self): 726 # Issue 20367. Duplicate futures should not raise exceptions or give 727 # duplicate responses. 728 # Issue #31641: accept arbitrary iterables. 729 future1 = self.executor.submit(time.sleep, 2) 730 completed = [ 731 f for f in futures.as_completed(itertools.repeat(future1, 3)) 732 ] 733 self.assertEqual(len(completed), 1) 734 735 def test_free_reference_yielded_future(self): 736 # Issue #14406: Generator should not keep references 737 # to finished futures. 738 futures_list = [Future() for _ in range(8)] 739 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 740 futures_list.append(create_future(state=FINISHED, result=42)) 741 742 with self.assertRaises(futures.TimeoutError): 743 for future in futures.as_completed(futures_list, timeout=0): 744 futures_list.remove(future) 745 wr = weakref.ref(future) 746 del future 747 self.assertIsNone(wr()) 748 749 futures_list[0].set_result("test") 750 for future in futures.as_completed(futures_list): 751 futures_list.remove(future) 752 wr = weakref.ref(future) 753 del future 754 self.assertIsNone(wr()) 755 if futures_list: 756 futures_list[0].set_result("test") 757 758 def test_correct_timeout_exception_msg(self): 759 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 760 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 761 762 with self.assertRaises(futures.TimeoutError) as cm: 763 list(futures.as_completed(futures_list, timeout=0)) 764 765 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 766 767 768create_executor_tests(AsCompletedTests) 769 770 771class ExecutorTest: 772 # Executor.shutdown() and context manager usage is tested by 773 # ExecutorShutdownTest. 774 def test_submit(self): 775 future = self.executor.submit(pow, 2, 8) 776 self.assertEqual(256, future.result()) 777 778 def test_submit_keyword(self): 779 future = self.executor.submit(mul, 2, y=8) 780 self.assertEqual(16, future.result()) 781 future = self.executor.submit(capture, 1, self=2, fn=3) 782 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) 783 with self.assertRaises(TypeError): 784 self.executor.submit(fn=capture, arg=1) 785 with self.assertRaises(TypeError): 786 self.executor.submit(arg=1) 787 788 def test_map(self): 789 self.assertEqual( 790 list(self.executor.map(pow, range(10), range(10))), 791 list(map(pow, range(10), range(10)))) 792 793 self.assertEqual( 794 list(self.executor.map(pow, range(10), range(10), chunksize=3)), 795 list(map(pow, range(10), range(10)))) 796 797 def test_map_exception(self): 798 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 799 self.assertEqual(i.__next__(), (0, 1)) 800 self.assertEqual(i.__next__(), (0, 1)) 801 self.assertRaises(ZeroDivisionError, i.__next__) 802 803 def test_map_timeout(self): 804 results = [] 805 try: 806 for i in self.executor.map(time.sleep, 807 [0, 0, 6], 808 timeout=5): 809 results.append(i) 810 except futures.TimeoutError: 811 pass 812 else: 813 self.fail('expected TimeoutError') 814 815 self.assertEqual([None, None], results) 816 817 def test_shutdown_race_issue12456(self): 818 # Issue #12456: race condition at shutdown where trying to post a 819 # sentinel in the call queue blocks (the queue is full while processes 820 # have exited). 821 self.executor.map(str, [2] * (self.worker_count + 1)) 822 self.executor.shutdown() 823 824 @support.cpython_only 825 def test_no_stale_references(self): 826 # Issue #16284: check that the executors don't unnecessarily hang onto 827 # references. 828 my_object = MyObject() 829 my_object_collected = threading.Event() 830 my_object_callback = weakref.ref( 831 my_object, lambda obj: my_object_collected.set()) 832 # Deliberately discarding the future. 833 self.executor.submit(my_object.my_method) 834 del my_object 835 836 collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) 837 self.assertTrue(collected, 838 "Stale reference not collected within timeout.") 839 840 def test_max_workers_negative(self): 841 for number in (0, -1): 842 with self.assertRaisesRegex(ValueError, 843 "max_workers must be greater " 844 "than 0"): 845 self.executor_type(max_workers=number) 846 847 def test_free_reference(self): 848 # Issue #14406: Result iterator should not keep an internal 849 # reference to result objects. 850 for obj in self.executor.map(make_dummy_object, range(10)): 851 wr = weakref.ref(obj) 852 del obj 853 self.assertIsNone(wr()) 854 855 856class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): 857 def test_map_submits_without_iteration(self): 858 """Tests verifying issue 11777.""" 859 finished = [] 860 def record_finished(n): 861 finished.append(n) 862 863 self.executor.map(record_finished, range(10)) 864 self.executor.shutdown(wait=True) 865 self.assertCountEqual(finished, range(10)) 866 867 def test_default_workers(self): 868 executor = self.executor_type() 869 expected = min(32, (os.cpu_count() or 1) + 4) 870 self.assertEqual(executor._max_workers, expected) 871 872 def test_saturation(self): 873 executor = self.executor_type(4) 874 def acquire_lock(lock): 875 lock.acquire() 876 877 sem = threading.Semaphore(0) 878 for i in range(15 * executor._max_workers): 879 executor.submit(acquire_lock, sem) 880 self.assertEqual(len(executor._threads), executor._max_workers) 881 for i in range(15 * executor._max_workers): 882 sem.release() 883 executor.shutdown(wait=True) 884 885 def test_idle_thread_reuse(self): 886 executor = self.executor_type() 887 executor.submit(mul, 21, 2).result() 888 executor.submit(mul, 6, 7).result() 889 executor.submit(mul, 3, 14).result() 890 self.assertEqual(len(executor._threads), 1) 891 executor.shutdown(wait=True) 892 893 894class ProcessPoolExecutorTest(ExecutorTest): 895 896 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') 897 def test_max_workers_too_large(self): 898 with self.assertRaisesRegex(ValueError, 899 "max_workers must be <= 61"): 900 futures.ProcessPoolExecutor(max_workers=62) 901 902 def test_killed_child(self): 903 # When a child process is abruptly terminated, the whole pool gets 904 # "broken". 905 futures = [self.executor.submit(time.sleep, 3)] 906 # Get one of the processes, and terminate (kill) it 907 p = next(iter(self.executor._processes.values())) 908 p.terminate() 909 for fut in futures: 910 self.assertRaises(BrokenProcessPool, fut.result) 911 # Submitting other jobs fails as well. 912 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 913 914 def test_map_chunksize(self): 915 def bad_map(): 916 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 917 918 ref = list(map(pow, range(40), range(40))) 919 self.assertEqual( 920 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 921 ref) 922 self.assertEqual( 923 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 924 ref) 925 self.assertEqual( 926 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 927 ref) 928 self.assertRaises(ValueError, bad_map) 929 930 @classmethod 931 def _test_traceback(cls): 932 raise RuntimeError(123) # some comment 933 934 def test_traceback(self): 935 # We want ensure that the traceback from the child process is 936 # contained in the traceback raised in the main process. 937 future = self.executor.submit(self._test_traceback) 938 with self.assertRaises(Exception) as cm: 939 future.result() 940 941 exc = cm.exception 942 self.assertIs(type(exc), RuntimeError) 943 self.assertEqual(exc.args, (123,)) 944 cause = exc.__cause__ 945 self.assertIs(type(cause), futures.process._RemoteTraceback) 946 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 947 948 with support.captured_stderr() as f1: 949 try: 950 raise exc 951 except RuntimeError: 952 sys.excepthook(*sys.exc_info()) 953 self.assertIn('raise RuntimeError(123) # some comment', 954 f1.getvalue()) 955 956 @hashlib_helper.requires_hashdigest('md5') 957 def test_ressources_gced_in_workers(self): 958 # Ensure that argument for a job are correctly gc-ed after the job 959 # is finished 960 mgr = get_context(self.ctx).Manager() 961 obj = EventfulGCObj(mgr) 962 future = self.executor.submit(id, obj) 963 future.result() 964 965 self.assertTrue(obj.event.wait(timeout=1)) 966 967 # explicitly destroy the object to ensure that EventfulGCObj.__del__() 968 # is called while manager is still running. 969 obj = None 970 support.gc_collect() 971 972 mgr.shutdown() 973 mgr.join() 974 975 def test_saturation(self): 976 executor = self.executor_type(4) 977 mp_context = get_context() 978 sem = mp_context.Semaphore(0) 979 job_count = 15 * executor._max_workers 980 try: 981 for _ in range(job_count): 982 executor.submit(sem.acquire) 983 self.assertEqual(len(executor._processes), executor._max_workers) 984 for _ in range(job_count): 985 sem.release() 986 finally: 987 executor.shutdown() 988 989 def test_idle_process_reuse_one(self): 990 executor = self.executor_type(4) 991 executor.submit(mul, 21, 2).result() 992 executor.submit(mul, 6, 7).result() 993 executor.submit(mul, 3, 14).result() 994 self.assertEqual(len(executor._processes), 1) 995 executor.shutdown() 996 997 def test_idle_process_reuse_multiple(self): 998 executor = self.executor_type(4) 999 executor.submit(mul, 12, 7).result() 1000 executor.submit(mul, 33, 25) 1001 executor.submit(mul, 25, 26).result() 1002 executor.submit(mul, 18, 29) 1003 self.assertLessEqual(len(executor._processes), 2) 1004 executor.shutdown() 1005 1006create_executor_tests(ProcessPoolExecutorTest, 1007 executor_mixins=(ProcessPoolForkMixin, 1008 ProcessPoolForkserverMixin, 1009 ProcessPoolSpawnMixin)) 1010 1011def _crash(delay=None): 1012 """Induces a segfault.""" 1013 if delay: 1014 time.sleep(delay) 1015 import faulthandler 1016 faulthandler.disable() 1017 faulthandler._sigsegv() 1018 1019 1020def _exit(): 1021 """Induces a sys exit with exitcode 1.""" 1022 sys.exit(1) 1023 1024 1025def _raise_error(Err): 1026 """Function that raises an Exception in process.""" 1027 raise Err() 1028 1029 1030def _raise_error_ignore_stderr(Err): 1031 """Function that raises an Exception in process and ignores stderr.""" 1032 import io 1033 sys.stderr = io.StringIO() 1034 raise Err() 1035 1036 1037def _return_instance(cls): 1038 """Function that returns a instance of cls.""" 1039 return cls() 1040 1041 1042class CrashAtPickle(object): 1043 """Bad object that triggers a segfault at pickling time.""" 1044 def __reduce__(self): 1045 _crash() 1046 1047 1048class CrashAtUnpickle(object): 1049 """Bad object that triggers a segfault at unpickling time.""" 1050 def __reduce__(self): 1051 return _crash, () 1052 1053 1054class ExitAtPickle(object): 1055 """Bad object that triggers a process exit at pickling time.""" 1056 def __reduce__(self): 1057 _exit() 1058 1059 1060class ExitAtUnpickle(object): 1061 """Bad object that triggers a process exit at unpickling time.""" 1062 def __reduce__(self): 1063 return _exit, () 1064 1065 1066class ErrorAtPickle(object): 1067 """Bad object that triggers an error at pickling time.""" 1068 def __reduce__(self): 1069 from pickle import PicklingError 1070 raise PicklingError("Error in pickle") 1071 1072 1073class ErrorAtUnpickle(object): 1074 """Bad object that triggers an error at unpickling time.""" 1075 def __reduce__(self): 1076 from pickle import UnpicklingError 1077 return _raise_error_ignore_stderr, (UnpicklingError, ) 1078 1079 1080class ExecutorDeadlockTest: 1081 TIMEOUT = support.SHORT_TIMEOUT 1082 1083 def _fail_on_deadlock(self, executor): 1084 # If we did not recover before TIMEOUT seconds, consider that the 1085 # executor is in a deadlock state and forcefully clean all its 1086 # composants. 1087 import faulthandler 1088 from tempfile import TemporaryFile 1089 with TemporaryFile(mode="w+") as f: 1090 faulthandler.dump_traceback(file=f) 1091 f.seek(0) 1092 tb = f.read() 1093 for p in executor._processes.values(): 1094 p.terminate() 1095 # This should be safe to call executor.shutdown here as all possible 1096 # deadlocks should have been broken. 1097 executor.shutdown(wait=True) 1098 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 1099 self.fail(f"Executor deadlock:\n\n{tb}") 1100 1101 1102 def _check_crash(self, error, func, *args, ignore_stderr=False): 1103 # test for deadlock caused by crashes in a pool 1104 self.executor.shutdown(wait=True) 1105 1106 executor = self.executor_type( 1107 max_workers=2, mp_context=get_context(self.ctx)) 1108 res = executor.submit(func, *args) 1109 1110 if ignore_stderr: 1111 cm = support.captured_stderr() 1112 else: 1113 cm = contextlib.nullcontext() 1114 1115 try: 1116 with self.assertRaises(error): 1117 with cm: 1118 res.result(timeout=self.TIMEOUT) 1119 except futures.TimeoutError: 1120 # If we did not recover before TIMEOUT seconds, 1121 # consider that the executor is in a deadlock state 1122 self._fail_on_deadlock(executor) 1123 executor.shutdown(wait=True) 1124 1125 def test_error_at_task_pickle(self): 1126 # Check problem occurring while pickling a task in 1127 # the task_handler thread 1128 self._check_crash(PicklingError, id, ErrorAtPickle()) 1129 1130 def test_exit_at_task_unpickle(self): 1131 # Check problem occurring while unpickling a task on workers 1132 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) 1133 1134 def test_error_at_task_unpickle(self): 1135 # Check problem occurring while unpickling a task on workers 1136 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) 1137 1138 def test_crash_at_task_unpickle(self): 1139 # Check problem occurring while unpickling a task on workers 1140 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) 1141 1142 def test_crash_during_func_exec_on_worker(self): 1143 # Check problem occurring during func execution on workers 1144 self._check_crash(BrokenProcessPool, _crash) 1145 1146 def test_exit_during_func_exec_on_worker(self): 1147 # Check problem occurring during func execution on workers 1148 self._check_crash(SystemExit, _exit) 1149 1150 def test_error_during_func_exec_on_worker(self): 1151 # Check problem occurring during func execution on workers 1152 self._check_crash(RuntimeError, _raise_error, RuntimeError) 1153 1154 def test_crash_during_result_pickle_on_worker(self): 1155 # Check problem occurring while pickling a task result 1156 # on workers 1157 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) 1158 1159 def test_exit_during_result_pickle_on_worker(self): 1160 # Check problem occurring while pickling a task result 1161 # on workers 1162 self._check_crash(SystemExit, _return_instance, ExitAtPickle) 1163 1164 def test_error_during_result_pickle_on_worker(self): 1165 # Check problem occurring while pickling a task result 1166 # on workers 1167 self._check_crash(PicklingError, _return_instance, ErrorAtPickle) 1168 1169 def test_error_during_result_unpickle_in_result_handler(self): 1170 # Check problem occurring while unpickling a task in 1171 # the result_handler thread 1172 self._check_crash(BrokenProcessPool, 1173 _return_instance, ErrorAtUnpickle, 1174 ignore_stderr=True) 1175 1176 def test_exit_during_result_unpickle_in_result_handler(self): 1177 # Check problem occurring while unpickling a task in 1178 # the result_handler thread 1179 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) 1180 1181 def test_shutdown_deadlock(self): 1182 # Test that the pool calling shutdown do not cause deadlock 1183 # if a worker fails after the shutdown call. 1184 self.executor.shutdown(wait=True) 1185 with self.executor_type(max_workers=2, 1186 mp_context=get_context(self.ctx)) as executor: 1187 self.executor = executor # Allow clean up in fail_on_deadlock 1188 f = executor.submit(_crash, delay=.1) 1189 executor.shutdown(wait=True) 1190 with self.assertRaises(BrokenProcessPool): 1191 f.result() 1192 1193 def test_shutdown_deadlock_pickle(self): 1194 # Test that the pool calling shutdown with wait=False does not cause 1195 # a deadlock if a task fails at pickle after the shutdown call. 1196 # Reported in bpo-39104. 1197 self.executor.shutdown(wait=True) 1198 with self.executor_type(max_workers=2, 1199 mp_context=get_context(self.ctx)) as executor: 1200 self.executor = executor # Allow clean up in fail_on_deadlock 1201 1202 # Start the executor and get the executor_manager_thread to collect 1203 # the threads and avoid dangling thread that should be cleaned up 1204 # asynchronously. 1205 executor.submit(id, 42).result() 1206 executor_manager = executor._executor_manager_thread 1207 1208 # Submit a task that fails at pickle and shutdown the executor 1209 # without waiting 1210 f = executor.submit(id, ErrorAtPickle()) 1211 executor.shutdown(wait=False) 1212 with self.assertRaises(PicklingError): 1213 f.result() 1214 1215 # Make sure the executor is eventually shutdown and do not leave 1216 # dangling threads 1217 executor_manager.join() 1218 1219 1220create_executor_tests(ExecutorDeadlockTest, 1221 executor_mixins=(ProcessPoolForkMixin, 1222 ProcessPoolForkserverMixin, 1223 ProcessPoolSpawnMixin)) 1224 1225 1226class FutureTests(BaseTestCase): 1227 def test_done_callback_with_result(self): 1228 callback_result = None 1229 def fn(callback_future): 1230 nonlocal callback_result 1231 callback_result = callback_future.result() 1232 1233 f = Future() 1234 f.add_done_callback(fn) 1235 f.set_result(5) 1236 self.assertEqual(5, callback_result) 1237 1238 def test_done_callback_with_exception(self): 1239 callback_exception = None 1240 def fn(callback_future): 1241 nonlocal callback_exception 1242 callback_exception = callback_future.exception() 1243 1244 f = Future() 1245 f.add_done_callback(fn) 1246 f.set_exception(Exception('test')) 1247 self.assertEqual(('test',), callback_exception.args) 1248 1249 def test_done_callback_with_cancel(self): 1250 was_cancelled = None 1251 def fn(callback_future): 1252 nonlocal was_cancelled 1253 was_cancelled = callback_future.cancelled() 1254 1255 f = Future() 1256 f.add_done_callback(fn) 1257 self.assertTrue(f.cancel()) 1258 self.assertTrue(was_cancelled) 1259 1260 def test_done_callback_raises(self): 1261 with support.captured_stderr() as stderr: 1262 raising_was_called = False 1263 fn_was_called = False 1264 1265 def raising_fn(callback_future): 1266 nonlocal raising_was_called 1267 raising_was_called = True 1268 raise Exception('doh!') 1269 1270 def fn(callback_future): 1271 nonlocal fn_was_called 1272 fn_was_called = True 1273 1274 f = Future() 1275 f.add_done_callback(raising_fn) 1276 f.add_done_callback(fn) 1277 f.set_result(5) 1278 self.assertTrue(raising_was_called) 1279 self.assertTrue(fn_was_called) 1280 self.assertIn('Exception: doh!', stderr.getvalue()) 1281 1282 def test_done_callback_already_successful(self): 1283 callback_result = None 1284 def fn(callback_future): 1285 nonlocal callback_result 1286 callback_result = callback_future.result() 1287 1288 f = Future() 1289 f.set_result(5) 1290 f.add_done_callback(fn) 1291 self.assertEqual(5, callback_result) 1292 1293 def test_done_callback_already_failed(self): 1294 callback_exception = None 1295 def fn(callback_future): 1296 nonlocal callback_exception 1297 callback_exception = callback_future.exception() 1298 1299 f = Future() 1300 f.set_exception(Exception('test')) 1301 f.add_done_callback(fn) 1302 self.assertEqual(('test',), callback_exception.args) 1303 1304 def test_done_callback_already_cancelled(self): 1305 was_cancelled = None 1306 def fn(callback_future): 1307 nonlocal was_cancelled 1308 was_cancelled = callback_future.cancelled() 1309 1310 f = Future() 1311 self.assertTrue(f.cancel()) 1312 f.add_done_callback(fn) 1313 self.assertTrue(was_cancelled) 1314 1315 def test_done_callback_raises_already_succeeded(self): 1316 with support.captured_stderr() as stderr: 1317 def raising_fn(callback_future): 1318 raise Exception('doh!') 1319 1320 f = Future() 1321 1322 # Set the result first to simulate a future that runs instantly, 1323 # effectively allowing the callback to be run immediately. 1324 f.set_result(5) 1325 f.add_done_callback(raising_fn) 1326 1327 self.assertIn('exception calling callback for', stderr.getvalue()) 1328 self.assertIn('doh!', stderr.getvalue()) 1329 1330 1331 def test_repr(self): 1332 self.assertRegex(repr(PENDING_FUTURE), 1333 '<Future at 0x[0-9a-f]+ state=pending>') 1334 self.assertRegex(repr(RUNNING_FUTURE), 1335 '<Future at 0x[0-9a-f]+ state=running>') 1336 self.assertRegex(repr(CANCELLED_FUTURE), 1337 '<Future at 0x[0-9a-f]+ state=cancelled>') 1338 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 1339 '<Future at 0x[0-9a-f]+ state=cancelled>') 1340 self.assertRegex( 1341 repr(EXCEPTION_FUTURE), 1342 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 1343 self.assertRegex( 1344 repr(SUCCESSFUL_FUTURE), 1345 '<Future at 0x[0-9a-f]+ state=finished returned int>') 1346 1347 1348 def test_cancel(self): 1349 f1 = create_future(state=PENDING) 1350 f2 = create_future(state=RUNNING) 1351 f3 = create_future(state=CANCELLED) 1352 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 1353 f5 = create_future(state=FINISHED, exception=OSError()) 1354 f6 = create_future(state=FINISHED, result=5) 1355 1356 self.assertTrue(f1.cancel()) 1357 self.assertEqual(f1._state, CANCELLED) 1358 1359 self.assertFalse(f2.cancel()) 1360 self.assertEqual(f2._state, RUNNING) 1361 1362 self.assertTrue(f3.cancel()) 1363 self.assertEqual(f3._state, CANCELLED) 1364 1365 self.assertTrue(f4.cancel()) 1366 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 1367 1368 self.assertFalse(f5.cancel()) 1369 self.assertEqual(f5._state, FINISHED) 1370 1371 self.assertFalse(f6.cancel()) 1372 self.assertEqual(f6._state, FINISHED) 1373 1374 def test_cancelled(self): 1375 self.assertFalse(PENDING_FUTURE.cancelled()) 1376 self.assertFalse(RUNNING_FUTURE.cancelled()) 1377 self.assertTrue(CANCELLED_FUTURE.cancelled()) 1378 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 1379 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 1380 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 1381 1382 def test_done(self): 1383 self.assertFalse(PENDING_FUTURE.done()) 1384 self.assertFalse(RUNNING_FUTURE.done()) 1385 self.assertTrue(CANCELLED_FUTURE.done()) 1386 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 1387 self.assertTrue(EXCEPTION_FUTURE.done()) 1388 self.assertTrue(SUCCESSFUL_FUTURE.done()) 1389 1390 def test_running(self): 1391 self.assertFalse(PENDING_FUTURE.running()) 1392 self.assertTrue(RUNNING_FUTURE.running()) 1393 self.assertFalse(CANCELLED_FUTURE.running()) 1394 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 1395 self.assertFalse(EXCEPTION_FUTURE.running()) 1396 self.assertFalse(SUCCESSFUL_FUTURE.running()) 1397 1398 def test_result_with_timeout(self): 1399 self.assertRaises(futures.TimeoutError, 1400 PENDING_FUTURE.result, timeout=0) 1401 self.assertRaises(futures.TimeoutError, 1402 RUNNING_FUTURE.result, timeout=0) 1403 self.assertRaises(futures.CancelledError, 1404 CANCELLED_FUTURE.result, timeout=0) 1405 self.assertRaises(futures.CancelledError, 1406 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 1407 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 1408 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 1409 1410 def test_result_with_success(self): 1411 # TODO(brian@sweetapp.com): This test is timing dependent. 1412 def notification(): 1413 # Wait until the main thread is waiting for the result. 1414 time.sleep(1) 1415 f1.set_result(42) 1416 1417 f1 = create_future(state=PENDING) 1418 t = threading.Thread(target=notification) 1419 t.start() 1420 1421 self.assertEqual(f1.result(timeout=5), 42) 1422 t.join() 1423 1424 def test_result_with_cancel(self): 1425 # TODO(brian@sweetapp.com): This test is timing dependent. 1426 def notification(): 1427 # Wait until the main thread is waiting for the result. 1428 time.sleep(1) 1429 f1.cancel() 1430 1431 f1 = create_future(state=PENDING) 1432 t = threading.Thread(target=notification) 1433 t.start() 1434 1435 self.assertRaises(futures.CancelledError, 1436 f1.result, timeout=support.SHORT_TIMEOUT) 1437 t.join() 1438 1439 def test_exception_with_timeout(self): 1440 self.assertRaises(futures.TimeoutError, 1441 PENDING_FUTURE.exception, timeout=0) 1442 self.assertRaises(futures.TimeoutError, 1443 RUNNING_FUTURE.exception, timeout=0) 1444 self.assertRaises(futures.CancelledError, 1445 CANCELLED_FUTURE.exception, timeout=0) 1446 self.assertRaises(futures.CancelledError, 1447 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 1448 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 1449 OSError)) 1450 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 1451 1452 def test_exception_with_success(self): 1453 def notification(): 1454 # Wait until the main thread is waiting for the exception. 1455 time.sleep(1) 1456 with f1._condition: 1457 f1._state = FINISHED 1458 f1._exception = OSError() 1459 f1._condition.notify_all() 1460 1461 f1 = create_future(state=PENDING) 1462 t = threading.Thread(target=notification) 1463 t.start() 1464 1465 self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) 1466 t.join() 1467 1468 def test_multiple_set_result(self): 1469 f = create_future(state=PENDING) 1470 f.set_result(1) 1471 1472 with self.assertRaisesRegex( 1473 futures.InvalidStateError, 1474 'FINISHED: <Future at 0x[0-9a-f]+ ' 1475 'state=finished returned int>' 1476 ): 1477 f.set_result(2) 1478 1479 self.assertTrue(f.done()) 1480 self.assertEqual(f.result(), 1) 1481 1482 def test_multiple_set_exception(self): 1483 f = create_future(state=PENDING) 1484 e = ValueError() 1485 f.set_exception(e) 1486 1487 with self.assertRaisesRegex( 1488 futures.InvalidStateError, 1489 'FINISHED: <Future at 0x[0-9a-f]+ ' 1490 'state=finished raised ValueError>' 1491 ): 1492 f.set_exception(Exception()) 1493 1494 self.assertEqual(f.exception(), e) 1495 1496 1497_threads_key = None 1498 1499def setUpModule(): 1500 global _threads_key 1501 _threads_key = support.threading_setup() 1502 1503 1504def tearDownModule(): 1505 support.threading_cleanup(*_threads_key) 1506 multiprocessing.util._cleanup_tests() 1507 1508 1509if __name__ == "__main__": 1510 unittest.main() 1511