1from test import support 2from test.support import import_helper 3from test.support import threading_helper 4 5# Skip tests if _multiprocessing wasn't built. 6import_helper.import_module('_multiprocessing') 7 8from test.support import hashlib_helper 9from test.support.script_helper import assert_python_ok 10 11import contextlib 12import itertools 13import logging 14from logging.handlers import QueueHandler 15import os 16import queue 17import sys 18import threading 19import time 20import unittest 21import weakref 22from pickle import PicklingError 23 24from concurrent import futures 25from concurrent.futures._base import ( 26 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 27 BrokenExecutor) 28from concurrent.futures.process import BrokenProcessPool, _check_system_limits 29from multiprocessing import get_context 30 31import multiprocessing.process 32import multiprocessing.util 33 34 35if support.check_sanitizer(address=True, memory=True): 36 # bpo-46633: Skip the test because it is too slow when Python is built 37 # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions. 38 raise unittest.SkipTest("test too slow on ASAN/MSAN build") 39 40 41def create_future(state=PENDING, exception=None, result=None): 42 f = Future() 43 f._state = state 44 f._exception = exception 45 f._result = result 46 return f 47 48 49PENDING_FUTURE = create_future(state=PENDING) 50RUNNING_FUTURE = create_future(state=RUNNING) 51CANCELLED_FUTURE = create_future(state=CANCELLED) 52CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 53EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 54SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 55 56INITIALIZER_STATUS = 'uninitialized' 57 58 59def mul(x, y): 60 return x * y 61 62def capture(*args, **kwargs): 63 return args, kwargs 64 65def sleep_and_raise(t): 66 time.sleep(t) 67 raise Exception('this is an exception') 68 69def sleep_and_print(t, msg): 70 time.sleep(t) 71 print(msg) 72 sys.stdout.flush() 73 74def init(x): 75 global INITIALIZER_STATUS 76 INITIALIZER_STATUS = x 77 78def get_init_status(): 79 return INITIALIZER_STATUS 80 81def init_fail(log_queue=None): 82 if log_queue is not None: 83 logger = logging.getLogger('concurrent.futures') 84 logger.addHandler(QueueHandler(log_queue)) 85 logger.setLevel('CRITICAL') 86 logger.propagate = False 87 time.sleep(0.1) # let some futures be scheduled 88 raise ValueError('error in initializer') 89 90 91class MyObject(object): 92 def my_method(self): 93 pass 94 95 96class EventfulGCObj(): 97 def __init__(self, mgr): 98 self.event = mgr.Event() 99 100 def __del__(self): 101 self.event.set() 102 103 104def make_dummy_object(_): 105 return MyObject() 106 107 108class BaseTestCase(unittest.TestCase): 109 def setUp(self): 110 self._thread_key = threading_helper.threading_setup() 111 112 def tearDown(self): 113 support.reap_children() 114 threading_helper.threading_cleanup(*self._thread_key) 115 116 117class ExecutorMixin: 118 worker_count = 5 119 executor_kwargs = {} 120 121 def setUp(self): 122 super().setUp() 123 124 self.t1 = time.monotonic() 125 if hasattr(self, "ctx"): 126 self.executor = self.executor_type( 127 max_workers=self.worker_count, 128 mp_context=self.get_context(), 129 **self.executor_kwargs) 130 else: 131 self.executor = self.executor_type( 132 max_workers=self.worker_count, 133 **self.executor_kwargs) 134 self._prime_executor() 135 136 def tearDown(self): 137 self.executor.shutdown(wait=True) 138 self.executor = None 139 140 dt = time.monotonic() - self.t1 141 if support.verbose: 142 print("%.2fs" % dt, end=' ') 143 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 144 145 super().tearDown() 146 147 def get_context(self): 148 return get_context(self.ctx) 149 150 def _prime_executor(self): 151 # Make sure that the executor is ready to do work before running the 152 # tests. This should reduce the probability of timeouts in the tests. 153 futures = [self.executor.submit(time.sleep, 0.1) 154 for _ in range(self.worker_count)] 155 for f in futures: 156 f.result() 157 158 159class ThreadPoolMixin(ExecutorMixin): 160 executor_type = futures.ThreadPoolExecutor 161 162 163class ProcessPoolForkMixin(ExecutorMixin): 164 executor_type = futures.ProcessPoolExecutor 165 ctx = "fork" 166 167 def get_context(self): 168 try: 169 _check_system_limits() 170 except NotImplementedError: 171 self.skipTest("ProcessPoolExecutor unavailable on this system") 172 if sys.platform == "win32": 173 self.skipTest("require unix system") 174 return super().get_context() 175 176 177class ProcessPoolSpawnMixin(ExecutorMixin): 178 executor_type = futures.ProcessPoolExecutor 179 ctx = "spawn" 180 181 def get_context(self): 182 try: 183 _check_system_limits() 184 except NotImplementedError: 185 self.skipTest("ProcessPoolExecutor unavailable on this system") 186 return super().get_context() 187 188 189class ProcessPoolForkserverMixin(ExecutorMixin): 190 executor_type = futures.ProcessPoolExecutor 191 ctx = "forkserver" 192 193 def get_context(self): 194 try: 195 _check_system_limits() 196 except NotImplementedError: 197 self.skipTest("ProcessPoolExecutor unavailable on this system") 198 if sys.platform == "win32": 199 self.skipTest("require unix system") 200 return super().get_context() 201 202 203def create_executor_tests(mixin, bases=(BaseTestCase,), 204 executor_mixins=(ThreadPoolMixin, 205 ProcessPoolForkMixin, 206 ProcessPoolForkserverMixin, 207 ProcessPoolSpawnMixin)): 208 def strip_mixin(name): 209 if name.endswith(('Mixin', 'Tests')): 210 return name[:-5] 211 elif name.endswith('Test'): 212 return name[:-4] 213 else: 214 return name 215 216 for exe in executor_mixins: 217 name = ("%s%sTest" 218 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 219 cls = type(name, (mixin,) + (exe,) + bases, {}) 220 globals()[name] = cls 221 222 223class InitializerMixin(ExecutorMixin): 224 worker_count = 2 225 226 def setUp(self): 227 global INITIALIZER_STATUS 228 INITIALIZER_STATUS = 'uninitialized' 229 self.executor_kwargs = dict(initializer=init, 230 initargs=('initialized',)) 231 super().setUp() 232 233 def test_initializer(self): 234 futures = [self.executor.submit(get_init_status) 235 for _ in range(self.worker_count)] 236 237 for f in futures: 238 self.assertEqual(f.result(), 'initialized') 239 240 241class FailingInitializerMixin(ExecutorMixin): 242 worker_count = 2 243 244 def setUp(self): 245 if hasattr(self, "ctx"): 246 # Pass a queue to redirect the child's logging output 247 self.mp_context = self.get_context() 248 self.log_queue = self.mp_context.Queue() 249 self.executor_kwargs = dict(initializer=init_fail, 250 initargs=(self.log_queue,)) 251 else: 252 # In a thread pool, the child shares our logging setup 253 # (see _assert_logged()) 254 self.mp_context = None 255 self.log_queue = None 256 self.executor_kwargs = dict(initializer=init_fail) 257 super().setUp() 258 259 def test_initializer(self): 260 with self._assert_logged('ValueError: error in initializer'): 261 try: 262 future = self.executor.submit(get_init_status) 263 except BrokenExecutor: 264 # Perhaps the executor is already broken 265 pass 266 else: 267 with self.assertRaises(BrokenExecutor): 268 future.result() 269 # At some point, the executor should break 270 t1 = time.monotonic() 271 while not self.executor._broken: 272 if time.monotonic() - t1 > 5: 273 self.fail("executor not broken after 5 s.") 274 time.sleep(0.01) 275 # ... and from this point submit() is guaranteed to fail 276 with self.assertRaises(BrokenExecutor): 277 self.executor.submit(get_init_status) 278 279 def _prime_executor(self): 280 pass 281 282 @contextlib.contextmanager 283 def _assert_logged(self, msg): 284 if self.log_queue is not None: 285 yield 286 output = [] 287 try: 288 while True: 289 output.append(self.log_queue.get_nowait().getMessage()) 290 except queue.Empty: 291 pass 292 else: 293 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 294 yield 295 output = cm.output 296 self.assertTrue(any(msg in line for line in output), 297 output) 298 299 300create_executor_tests(InitializerMixin) 301create_executor_tests(FailingInitializerMixin) 302 303 304class ExecutorShutdownTest: 305 def test_run_after_shutdown(self): 306 self.executor.shutdown() 307 self.assertRaises(RuntimeError, 308 self.executor.submit, 309 pow, 2, 5) 310 311 def test_interpreter_shutdown(self): 312 # Test the atexit hook for shutdown of worker threads and processes 313 rc, out, err = assert_python_ok('-c', """if 1: 314 from concurrent.futures import {executor_type} 315 from time import sleep 316 from test.test_concurrent_futures import sleep_and_print 317 if __name__ == "__main__": 318 context = '{context}' 319 if context == "": 320 t = {executor_type}(5) 321 else: 322 from multiprocessing import get_context 323 context = get_context(context) 324 t = {executor_type}(5, mp_context=context) 325 t.submit(sleep_and_print, 1.0, "apple") 326 """.format(executor_type=self.executor_type.__name__, 327 context=getattr(self, "ctx", ""))) 328 # Errors in atexit hooks don't change the process exit code, check 329 # stderr manually. 330 self.assertFalse(err) 331 self.assertEqual(out.strip(), b"apple") 332 333 def test_submit_after_interpreter_shutdown(self): 334 # Test the atexit hook for shutdown of worker threads and processes 335 rc, out, err = assert_python_ok('-c', """if 1: 336 import atexit 337 @atexit.register 338 def run_last(): 339 try: 340 t.submit(id, None) 341 except RuntimeError: 342 print("runtime-error") 343 raise 344 from concurrent.futures import {executor_type} 345 if __name__ == "__main__": 346 context = '{context}' 347 if not context: 348 t = {executor_type}(5) 349 else: 350 from multiprocessing import get_context 351 context = get_context(context) 352 t = {executor_type}(5, mp_context=context) 353 t.submit(id, 42).result() 354 """.format(executor_type=self.executor_type.__name__, 355 context=getattr(self, "ctx", ""))) 356 # Errors in atexit hooks don't change the process exit code, check 357 # stderr manually. 358 self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) 359 self.assertEqual(out.strip(), b"runtime-error") 360 361 def test_hang_issue12364(self): 362 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 363 self.executor.shutdown() 364 for f in fs: 365 f.result() 366 367 def test_cancel_futures(self): 368 executor = self.executor_type(max_workers=3) 369 fs = [executor.submit(time.sleep, .1) for _ in range(50)] 370 executor.shutdown(cancel_futures=True) 371 # We can't guarantee the exact number of cancellations, but we can 372 # guarantee that *some* were cancelled. With setting max_workers to 3, 373 # most of the submitted futures should have been cancelled. 374 cancelled = [fut for fut in fs if fut.cancelled()] 375 self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}") 376 377 # Ensure the other futures were able to finish. 378 # Use "not fut.cancelled()" instead of "fut.done()" to include futures 379 # that may have been left in a pending state. 380 others = [fut for fut in fs if not fut.cancelled()] 381 for fut in others: 382 self.assertTrue(fut.done(), msg=f"{fut._state=}") 383 self.assertIsNone(fut.exception()) 384 385 # Similar to the number of cancelled futures, we can't guarantee the 386 # exact number that completed. But, we can guarantee that at least 387 # one finished. 388 self.assertTrue(len(others) > 0, msg=f"{len(others)=}") 389 390 def test_hang_issue39205(self): 391 """shutdown(wait=False) doesn't hang at exit with running futures. 392 393 See https://bugs.python.org/issue39205. 394 """ 395 if self.executor_type == futures.ProcessPoolExecutor: 396 raise unittest.SkipTest( 397 "Hangs due to https://bugs.python.org/issue39205") 398 399 rc, out, err = assert_python_ok('-c', """if True: 400 from concurrent.futures import {executor_type} 401 from test.test_concurrent_futures import sleep_and_print 402 if __name__ == "__main__": 403 t = {executor_type}(max_workers=3) 404 t.submit(sleep_and_print, 1.0, "apple") 405 t.shutdown(wait=False) 406 """.format(executor_type=self.executor_type.__name__)) 407 self.assertFalse(err) 408 self.assertEqual(out.strip(), b"apple") 409 410 411class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): 412 def _prime_executor(self): 413 pass 414 415 def test_threads_terminate(self): 416 def acquire_lock(lock): 417 lock.acquire() 418 419 sem = threading.Semaphore(0) 420 for i in range(3): 421 self.executor.submit(acquire_lock, sem) 422 self.assertEqual(len(self.executor._threads), 3) 423 for i in range(3): 424 sem.release() 425 self.executor.shutdown() 426 for t in self.executor._threads: 427 t.join() 428 429 def test_context_manager_shutdown(self): 430 with futures.ThreadPoolExecutor(max_workers=5) as e: 431 executor = e 432 self.assertEqual(list(e.map(abs, range(-5, 5))), 433 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 434 435 for t in executor._threads: 436 t.join() 437 438 def test_del_shutdown(self): 439 executor = futures.ThreadPoolExecutor(max_workers=5) 440 res = executor.map(abs, range(-5, 5)) 441 threads = executor._threads 442 del executor 443 444 for t in threads: 445 t.join() 446 447 # Make sure the results were all computed before the 448 # executor got shutdown. 449 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 450 451 def test_shutdown_no_wait(self): 452 # Ensure that the executor cleans up the threads when calling 453 # shutdown with wait=False 454 executor = futures.ThreadPoolExecutor(max_workers=5) 455 res = executor.map(abs, range(-5, 5)) 456 threads = executor._threads 457 executor.shutdown(wait=False) 458 for t in threads: 459 t.join() 460 461 # Make sure the results were all computed before the 462 # executor got shutdown. 463 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 464 465 466 def test_thread_names_assigned(self): 467 executor = futures.ThreadPoolExecutor( 468 max_workers=5, thread_name_prefix='SpecialPool') 469 executor.map(abs, range(-5, 5)) 470 threads = executor._threads 471 del executor 472 support.gc_collect() # For PyPy or other GCs. 473 474 for t in threads: 475 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 476 t.join() 477 478 def test_thread_names_default(self): 479 executor = futures.ThreadPoolExecutor(max_workers=5) 480 executor.map(abs, range(-5, 5)) 481 threads = executor._threads 482 del executor 483 support.gc_collect() # For PyPy or other GCs. 484 485 for t in threads: 486 # Ensure that our default name is reasonably sane and unique when 487 # no thread_name_prefix was supplied. 488 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 489 t.join() 490 491 def test_cancel_futures_wait_false(self): 492 # Can only be reliably tested for TPE, since PPE often hangs with 493 # `wait=False` (even without *cancel_futures*). 494 rc, out, err = assert_python_ok('-c', """if True: 495 from concurrent.futures import ThreadPoolExecutor 496 from test.test_concurrent_futures import sleep_and_print 497 if __name__ == "__main__": 498 t = ThreadPoolExecutor() 499 t.submit(sleep_and_print, .1, "apple") 500 t.shutdown(wait=False, cancel_futures=True) 501 """.format(executor_type=self.executor_type.__name__)) 502 # Errors in atexit hooks don't change the process exit code, check 503 # stderr manually. 504 self.assertFalse(err) 505 self.assertEqual(out.strip(), b"apple") 506 507 508class ProcessPoolShutdownTest(ExecutorShutdownTest): 509 def _prime_executor(self): 510 pass 511 512 def test_processes_terminate(self): 513 def acquire_lock(lock): 514 lock.acquire() 515 516 mp_context = get_context() 517 sem = mp_context.Semaphore(0) 518 for _ in range(3): 519 self.executor.submit(acquire_lock, sem) 520 self.assertEqual(len(self.executor._processes), 3) 521 for _ in range(3): 522 sem.release() 523 processes = self.executor._processes 524 self.executor.shutdown() 525 526 for p in processes.values(): 527 p.join() 528 529 def test_context_manager_shutdown(self): 530 with futures.ProcessPoolExecutor(max_workers=5) as e: 531 processes = e._processes 532 self.assertEqual(list(e.map(abs, range(-5, 5))), 533 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 534 535 for p in processes.values(): 536 p.join() 537 538 def test_del_shutdown(self): 539 executor = futures.ProcessPoolExecutor(max_workers=5) 540 res = executor.map(abs, range(-5, 5)) 541 executor_manager_thread = executor._executor_manager_thread 542 processes = executor._processes 543 call_queue = executor._call_queue 544 executor_manager_thread = executor._executor_manager_thread 545 del executor 546 support.gc_collect() # For PyPy or other GCs. 547 548 # Make sure that all the executor resources were properly cleaned by 549 # the shutdown process 550 executor_manager_thread.join() 551 for p in processes.values(): 552 p.join() 553 call_queue.join_thread() 554 555 # Make sure the results were all computed before the 556 # executor got shutdown. 557 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 558 559 def test_shutdown_no_wait(self): 560 # Ensure that the executor cleans up the processes when calling 561 # shutdown with wait=False 562 executor = futures.ProcessPoolExecutor(max_workers=5) 563 res = executor.map(abs, range(-5, 5)) 564 processes = executor._processes 565 call_queue = executor._call_queue 566 executor_manager_thread = executor._executor_manager_thread 567 executor.shutdown(wait=False) 568 569 # Make sure that all the executor resources were properly cleaned by 570 # the shutdown process 571 executor_manager_thread.join() 572 for p in processes.values(): 573 p.join() 574 call_queue.join_thread() 575 576 # Make sure the results were all computed before the executor got 577 # shutdown. 578 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 579 580 581create_executor_tests(ProcessPoolShutdownTest, 582 executor_mixins=(ProcessPoolForkMixin, 583 ProcessPoolForkserverMixin, 584 ProcessPoolSpawnMixin)) 585 586 587class WaitTests: 588 def test_20369(self): 589 # See https://bugs.python.org/issue20369 590 future = self.executor.submit(time.sleep, 1.5) 591 done, not_done = futures.wait([future, future], 592 return_when=futures.ALL_COMPLETED) 593 self.assertEqual({future}, done) 594 self.assertEqual(set(), not_done) 595 596 597 def test_first_completed(self): 598 future1 = self.executor.submit(mul, 21, 2) 599 future2 = self.executor.submit(time.sleep, 1.5) 600 601 done, not_done = futures.wait( 602 [CANCELLED_FUTURE, future1, future2], 603 return_when=futures.FIRST_COMPLETED) 604 605 self.assertEqual(set([future1]), done) 606 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 607 608 def test_first_completed_some_already_completed(self): 609 future1 = self.executor.submit(time.sleep, 1.5) 610 611 finished, pending = futures.wait( 612 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 613 return_when=futures.FIRST_COMPLETED) 614 615 self.assertEqual( 616 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 617 finished) 618 self.assertEqual(set([future1]), pending) 619 620 def test_first_exception(self): 621 future1 = self.executor.submit(mul, 2, 21) 622 future2 = self.executor.submit(sleep_and_raise, 1.5) 623 future3 = self.executor.submit(time.sleep, 3) 624 625 finished, pending = futures.wait( 626 [future1, future2, future3], 627 return_when=futures.FIRST_EXCEPTION) 628 629 self.assertEqual(set([future1, future2]), finished) 630 self.assertEqual(set([future3]), pending) 631 632 def test_first_exception_some_already_complete(self): 633 future1 = self.executor.submit(divmod, 21, 0) 634 future2 = self.executor.submit(time.sleep, 1.5) 635 636 finished, pending = futures.wait( 637 [SUCCESSFUL_FUTURE, 638 CANCELLED_FUTURE, 639 CANCELLED_AND_NOTIFIED_FUTURE, 640 future1, future2], 641 return_when=futures.FIRST_EXCEPTION) 642 643 self.assertEqual(set([SUCCESSFUL_FUTURE, 644 CANCELLED_AND_NOTIFIED_FUTURE, 645 future1]), finished) 646 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 647 648 def test_first_exception_one_already_failed(self): 649 future1 = self.executor.submit(time.sleep, 2) 650 651 finished, pending = futures.wait( 652 [EXCEPTION_FUTURE, future1], 653 return_when=futures.FIRST_EXCEPTION) 654 655 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 656 self.assertEqual(set([future1]), pending) 657 658 def test_all_completed(self): 659 future1 = self.executor.submit(divmod, 2, 0) 660 future2 = self.executor.submit(mul, 2, 21) 661 662 finished, pending = futures.wait( 663 [SUCCESSFUL_FUTURE, 664 CANCELLED_AND_NOTIFIED_FUTURE, 665 EXCEPTION_FUTURE, 666 future1, 667 future2], 668 return_when=futures.ALL_COMPLETED) 669 670 self.assertEqual(set([SUCCESSFUL_FUTURE, 671 CANCELLED_AND_NOTIFIED_FUTURE, 672 EXCEPTION_FUTURE, 673 future1, 674 future2]), finished) 675 self.assertEqual(set(), pending) 676 677 def test_timeout(self): 678 future1 = self.executor.submit(mul, 6, 7) 679 future2 = self.executor.submit(time.sleep, 6) 680 681 finished, pending = futures.wait( 682 [CANCELLED_AND_NOTIFIED_FUTURE, 683 EXCEPTION_FUTURE, 684 SUCCESSFUL_FUTURE, 685 future1, future2], 686 timeout=5, 687 return_when=futures.ALL_COMPLETED) 688 689 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 690 EXCEPTION_FUTURE, 691 SUCCESSFUL_FUTURE, 692 future1]), finished) 693 self.assertEqual(set([future2]), pending) 694 695 696class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): 697 698 def test_pending_calls_race(self): 699 # Issue #14406: multi-threaded race condition when waiting on all 700 # futures. 701 event = threading.Event() 702 def future_func(): 703 event.wait() 704 oldswitchinterval = sys.getswitchinterval() 705 sys.setswitchinterval(1e-6) 706 try: 707 fs = {self.executor.submit(future_func) for i in range(100)} 708 event.set() 709 futures.wait(fs, return_when=futures.ALL_COMPLETED) 710 finally: 711 sys.setswitchinterval(oldswitchinterval) 712 713 714create_executor_tests(WaitTests, 715 executor_mixins=(ProcessPoolForkMixin, 716 ProcessPoolForkserverMixin, 717 ProcessPoolSpawnMixin)) 718 719 720class AsCompletedTests: 721 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 722 def test_no_timeout(self): 723 future1 = self.executor.submit(mul, 2, 21) 724 future2 = self.executor.submit(mul, 7, 6) 725 726 completed = set(futures.as_completed( 727 [CANCELLED_AND_NOTIFIED_FUTURE, 728 EXCEPTION_FUTURE, 729 SUCCESSFUL_FUTURE, 730 future1, future2])) 731 self.assertEqual(set( 732 [CANCELLED_AND_NOTIFIED_FUTURE, 733 EXCEPTION_FUTURE, 734 SUCCESSFUL_FUTURE, 735 future1, future2]), 736 completed) 737 738 def test_zero_timeout(self): 739 future1 = self.executor.submit(time.sleep, 2) 740 completed_futures = set() 741 try: 742 for future in futures.as_completed( 743 [CANCELLED_AND_NOTIFIED_FUTURE, 744 EXCEPTION_FUTURE, 745 SUCCESSFUL_FUTURE, 746 future1], 747 timeout=0): 748 completed_futures.add(future) 749 except futures.TimeoutError: 750 pass 751 752 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 753 EXCEPTION_FUTURE, 754 SUCCESSFUL_FUTURE]), 755 completed_futures) 756 757 def test_duplicate_futures(self): 758 # Issue 20367. Duplicate futures should not raise exceptions or give 759 # duplicate responses. 760 # Issue #31641: accept arbitrary iterables. 761 future1 = self.executor.submit(time.sleep, 2) 762 completed = [ 763 f for f in futures.as_completed(itertools.repeat(future1, 3)) 764 ] 765 self.assertEqual(len(completed), 1) 766 767 def test_free_reference_yielded_future(self): 768 # Issue #14406: Generator should not keep references 769 # to finished futures. 770 futures_list = [Future() for _ in range(8)] 771 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 772 futures_list.append(create_future(state=FINISHED, result=42)) 773 774 with self.assertRaises(futures.TimeoutError): 775 for future in futures.as_completed(futures_list, timeout=0): 776 futures_list.remove(future) 777 wr = weakref.ref(future) 778 del future 779 support.gc_collect() # For PyPy or other GCs. 780 self.assertIsNone(wr()) 781 782 futures_list[0].set_result("test") 783 for future in futures.as_completed(futures_list): 784 futures_list.remove(future) 785 wr = weakref.ref(future) 786 del future 787 support.gc_collect() # For PyPy or other GCs. 788 self.assertIsNone(wr()) 789 if futures_list: 790 futures_list[0].set_result("test") 791 792 def test_correct_timeout_exception_msg(self): 793 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 794 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 795 796 with self.assertRaises(futures.TimeoutError) as cm: 797 list(futures.as_completed(futures_list, timeout=0)) 798 799 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 800 801 802create_executor_tests(AsCompletedTests) 803 804 805class ExecutorTest: 806 # Executor.shutdown() and context manager usage is tested by 807 # ExecutorShutdownTest. 808 def test_submit(self): 809 future = self.executor.submit(pow, 2, 8) 810 self.assertEqual(256, future.result()) 811 812 def test_submit_keyword(self): 813 future = self.executor.submit(mul, 2, y=8) 814 self.assertEqual(16, future.result()) 815 future = self.executor.submit(capture, 1, self=2, fn=3) 816 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) 817 with self.assertRaises(TypeError): 818 self.executor.submit(fn=capture, arg=1) 819 with self.assertRaises(TypeError): 820 self.executor.submit(arg=1) 821 822 def test_map(self): 823 self.assertEqual( 824 list(self.executor.map(pow, range(10), range(10))), 825 list(map(pow, range(10), range(10)))) 826 827 self.assertEqual( 828 list(self.executor.map(pow, range(10), range(10), chunksize=3)), 829 list(map(pow, range(10), range(10)))) 830 831 def test_map_exception(self): 832 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 833 self.assertEqual(i.__next__(), (0, 1)) 834 self.assertEqual(i.__next__(), (0, 1)) 835 self.assertRaises(ZeroDivisionError, i.__next__) 836 837 def test_map_timeout(self): 838 results = [] 839 try: 840 for i in self.executor.map(time.sleep, 841 [0, 0, 6], 842 timeout=5): 843 results.append(i) 844 except futures.TimeoutError: 845 pass 846 else: 847 self.fail('expected TimeoutError') 848 849 self.assertEqual([None, None], results) 850 851 def test_shutdown_race_issue12456(self): 852 # Issue #12456: race condition at shutdown where trying to post a 853 # sentinel in the call queue blocks (the queue is full while processes 854 # have exited). 855 self.executor.map(str, [2] * (self.worker_count + 1)) 856 self.executor.shutdown() 857 858 @support.cpython_only 859 def test_no_stale_references(self): 860 # Issue #16284: check that the executors don't unnecessarily hang onto 861 # references. 862 my_object = MyObject() 863 my_object_collected = threading.Event() 864 my_object_callback = weakref.ref( 865 my_object, lambda obj: my_object_collected.set()) 866 # Deliberately discarding the future. 867 self.executor.submit(my_object.my_method) 868 del my_object 869 870 collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) 871 self.assertTrue(collected, 872 "Stale reference not collected within timeout.") 873 874 def test_max_workers_negative(self): 875 for number in (0, -1): 876 with self.assertRaisesRegex(ValueError, 877 "max_workers must be greater " 878 "than 0"): 879 self.executor_type(max_workers=number) 880 881 def test_free_reference(self): 882 # Issue #14406: Result iterator should not keep an internal 883 # reference to result objects. 884 for obj in self.executor.map(make_dummy_object, range(10)): 885 wr = weakref.ref(obj) 886 del obj 887 support.gc_collect() # For PyPy or other GCs. 888 self.assertIsNone(wr()) 889 890 891class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): 892 def test_map_submits_without_iteration(self): 893 """Tests verifying issue 11777.""" 894 finished = [] 895 def record_finished(n): 896 finished.append(n) 897 898 self.executor.map(record_finished, range(10)) 899 self.executor.shutdown(wait=True) 900 self.assertCountEqual(finished, range(10)) 901 902 def test_default_workers(self): 903 executor = self.executor_type() 904 expected = min(32, (os.cpu_count() or 1) + 4) 905 self.assertEqual(executor._max_workers, expected) 906 907 def test_saturation(self): 908 executor = self.executor_type(4) 909 def acquire_lock(lock): 910 lock.acquire() 911 912 sem = threading.Semaphore(0) 913 for i in range(15 * executor._max_workers): 914 executor.submit(acquire_lock, sem) 915 self.assertEqual(len(executor._threads), executor._max_workers) 916 for i in range(15 * executor._max_workers): 917 sem.release() 918 executor.shutdown(wait=True) 919 920 def test_idle_thread_reuse(self): 921 executor = self.executor_type() 922 executor.submit(mul, 21, 2).result() 923 executor.submit(mul, 6, 7).result() 924 executor.submit(mul, 3, 14).result() 925 self.assertEqual(len(executor._threads), 1) 926 executor.shutdown(wait=True) 927 928 @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') 929 def test_hang_global_shutdown_lock(self): 930 # bpo-45021: _global_shutdown_lock should be reinitialized in the child 931 # process, otherwise it will never exit 932 def submit(pool): 933 pool.submit(submit, pool) 934 935 with futures.ThreadPoolExecutor(1) as pool: 936 pool.submit(submit, pool) 937 938 for _ in range(50): 939 with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers: 940 workers.submit(tuple) 941 942 943class ProcessPoolExecutorTest(ExecutorTest): 944 945 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') 946 def test_max_workers_too_large(self): 947 with self.assertRaisesRegex(ValueError, 948 "max_workers must be <= 61"): 949 futures.ProcessPoolExecutor(max_workers=62) 950 951 def test_killed_child(self): 952 # When a child process is abruptly terminated, the whole pool gets 953 # "broken". 954 futures = [self.executor.submit(time.sleep, 3)] 955 # Get one of the processes, and terminate (kill) it 956 p = next(iter(self.executor._processes.values())) 957 p.terminate() 958 for fut in futures: 959 self.assertRaises(BrokenProcessPool, fut.result) 960 # Submitting other jobs fails as well. 961 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 962 963 def test_map_chunksize(self): 964 def bad_map(): 965 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 966 967 ref = list(map(pow, range(40), range(40))) 968 self.assertEqual( 969 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 970 ref) 971 self.assertEqual( 972 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 973 ref) 974 self.assertEqual( 975 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 976 ref) 977 self.assertRaises(ValueError, bad_map) 978 979 @classmethod 980 def _test_traceback(cls): 981 raise RuntimeError(123) # some comment 982 983 def test_traceback(self): 984 # We want ensure that the traceback from the child process is 985 # contained in the traceback raised in the main process. 986 future = self.executor.submit(self._test_traceback) 987 with self.assertRaises(Exception) as cm: 988 future.result() 989 990 exc = cm.exception 991 self.assertIs(type(exc), RuntimeError) 992 self.assertEqual(exc.args, (123,)) 993 cause = exc.__cause__ 994 self.assertIs(type(cause), futures.process._RemoteTraceback) 995 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 996 997 with support.captured_stderr() as f1: 998 try: 999 raise exc 1000 except RuntimeError: 1001 sys.excepthook(*sys.exc_info()) 1002 self.assertIn('raise RuntimeError(123) # some comment', 1003 f1.getvalue()) 1004 1005 @hashlib_helper.requires_hashdigest('md5') 1006 def test_ressources_gced_in_workers(self): 1007 # Ensure that argument for a job are correctly gc-ed after the job 1008 # is finished 1009 mgr = get_context(self.ctx).Manager() 1010 obj = EventfulGCObj(mgr) 1011 future = self.executor.submit(id, obj) 1012 future.result() 1013 1014 self.assertTrue(obj.event.wait(timeout=1)) 1015 1016 # explicitly destroy the object to ensure that EventfulGCObj.__del__() 1017 # is called while manager is still running. 1018 obj = None 1019 support.gc_collect() 1020 1021 mgr.shutdown() 1022 mgr.join() 1023 1024 def test_saturation(self): 1025 executor = self.executor_type(4) 1026 mp_context = get_context() 1027 sem = mp_context.Semaphore(0) 1028 job_count = 15 * executor._max_workers 1029 try: 1030 for _ in range(job_count): 1031 executor.submit(sem.acquire) 1032 self.assertEqual(len(executor._processes), executor._max_workers) 1033 for _ in range(job_count): 1034 sem.release() 1035 finally: 1036 executor.shutdown() 1037 1038 def test_idle_process_reuse_one(self): 1039 executor = self.executor_type(4) 1040 executor.submit(mul, 21, 2).result() 1041 executor.submit(mul, 6, 7).result() 1042 executor.submit(mul, 3, 14).result() 1043 self.assertEqual(len(executor._processes), 1) 1044 executor.shutdown() 1045 1046 def test_idle_process_reuse_multiple(self): 1047 executor = self.executor_type(4) 1048 executor.submit(mul, 12, 7).result() 1049 executor.submit(mul, 33, 25) 1050 executor.submit(mul, 25, 26).result() 1051 executor.submit(mul, 18, 29) 1052 self.assertLessEqual(len(executor._processes), 2) 1053 executor.shutdown() 1054 1055create_executor_tests(ProcessPoolExecutorTest, 1056 executor_mixins=(ProcessPoolForkMixin, 1057 ProcessPoolForkserverMixin, 1058 ProcessPoolSpawnMixin)) 1059 1060def _crash(delay=None): 1061 """Induces a segfault.""" 1062 if delay: 1063 time.sleep(delay) 1064 import faulthandler 1065 faulthandler.disable() 1066 faulthandler._sigsegv() 1067 1068 1069def _exit(): 1070 """Induces a sys exit with exitcode 1.""" 1071 sys.exit(1) 1072 1073 1074def _raise_error(Err): 1075 """Function that raises an Exception in process.""" 1076 raise Err() 1077 1078 1079def _raise_error_ignore_stderr(Err): 1080 """Function that raises an Exception in process and ignores stderr.""" 1081 import io 1082 sys.stderr = io.StringIO() 1083 raise Err() 1084 1085 1086def _return_instance(cls): 1087 """Function that returns a instance of cls.""" 1088 return cls() 1089 1090 1091class CrashAtPickle(object): 1092 """Bad object that triggers a segfault at pickling time.""" 1093 def __reduce__(self): 1094 _crash() 1095 1096 1097class CrashAtUnpickle(object): 1098 """Bad object that triggers a segfault at unpickling time.""" 1099 def __reduce__(self): 1100 return _crash, () 1101 1102 1103class ExitAtPickle(object): 1104 """Bad object that triggers a process exit at pickling time.""" 1105 def __reduce__(self): 1106 _exit() 1107 1108 1109class ExitAtUnpickle(object): 1110 """Bad object that triggers a process exit at unpickling time.""" 1111 def __reduce__(self): 1112 return _exit, () 1113 1114 1115class ErrorAtPickle(object): 1116 """Bad object that triggers an error at pickling time.""" 1117 def __reduce__(self): 1118 from pickle import PicklingError 1119 raise PicklingError("Error in pickle") 1120 1121 1122class ErrorAtUnpickle(object): 1123 """Bad object that triggers an error at unpickling time.""" 1124 def __reduce__(self): 1125 from pickle import UnpicklingError 1126 return _raise_error_ignore_stderr, (UnpicklingError, ) 1127 1128 1129class ExecutorDeadlockTest: 1130 TIMEOUT = support.SHORT_TIMEOUT 1131 1132 def _fail_on_deadlock(self, executor): 1133 # If we did not recover before TIMEOUT seconds, consider that the 1134 # executor is in a deadlock state and forcefully clean all its 1135 # composants. 1136 import faulthandler 1137 from tempfile import TemporaryFile 1138 with TemporaryFile(mode="w+") as f: 1139 faulthandler.dump_traceback(file=f) 1140 f.seek(0) 1141 tb = f.read() 1142 for p in executor._processes.values(): 1143 p.terminate() 1144 # This should be safe to call executor.shutdown here as all possible 1145 # deadlocks should have been broken. 1146 executor.shutdown(wait=True) 1147 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 1148 self.fail(f"Executor deadlock:\n\n{tb}") 1149 1150 1151 def _check_crash(self, error, func, *args, ignore_stderr=False): 1152 # test for deadlock caused by crashes in a pool 1153 self.executor.shutdown(wait=True) 1154 1155 executor = self.executor_type( 1156 max_workers=2, mp_context=get_context(self.ctx)) 1157 res = executor.submit(func, *args) 1158 1159 if ignore_stderr: 1160 cm = support.captured_stderr() 1161 else: 1162 cm = contextlib.nullcontext() 1163 1164 try: 1165 with self.assertRaises(error): 1166 with cm: 1167 res.result(timeout=self.TIMEOUT) 1168 except futures.TimeoutError: 1169 # If we did not recover before TIMEOUT seconds, 1170 # consider that the executor is in a deadlock state 1171 self._fail_on_deadlock(executor) 1172 executor.shutdown(wait=True) 1173 1174 def test_error_at_task_pickle(self): 1175 # Check problem occurring while pickling a task in 1176 # the task_handler thread 1177 self._check_crash(PicklingError, id, ErrorAtPickle()) 1178 1179 def test_exit_at_task_unpickle(self): 1180 # Check problem occurring while unpickling a task on workers 1181 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) 1182 1183 def test_error_at_task_unpickle(self): 1184 # Check problem occurring while unpickling a task on workers 1185 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) 1186 1187 def test_crash_at_task_unpickle(self): 1188 # Check problem occurring while unpickling a task on workers 1189 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) 1190 1191 def test_crash_during_func_exec_on_worker(self): 1192 # Check problem occurring during func execution on workers 1193 self._check_crash(BrokenProcessPool, _crash) 1194 1195 def test_exit_during_func_exec_on_worker(self): 1196 # Check problem occurring during func execution on workers 1197 self._check_crash(SystemExit, _exit) 1198 1199 def test_error_during_func_exec_on_worker(self): 1200 # Check problem occurring during func execution on workers 1201 self._check_crash(RuntimeError, _raise_error, RuntimeError) 1202 1203 def test_crash_during_result_pickle_on_worker(self): 1204 # Check problem occurring while pickling a task result 1205 # on workers 1206 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) 1207 1208 def test_exit_during_result_pickle_on_worker(self): 1209 # Check problem occurring while pickling a task result 1210 # on workers 1211 self._check_crash(SystemExit, _return_instance, ExitAtPickle) 1212 1213 def test_error_during_result_pickle_on_worker(self): 1214 # Check problem occurring while pickling a task result 1215 # on workers 1216 self._check_crash(PicklingError, _return_instance, ErrorAtPickle) 1217 1218 def test_error_during_result_unpickle_in_result_handler(self): 1219 # Check problem occurring while unpickling a task in 1220 # the result_handler thread 1221 self._check_crash(BrokenProcessPool, 1222 _return_instance, ErrorAtUnpickle, 1223 ignore_stderr=True) 1224 1225 def test_exit_during_result_unpickle_in_result_handler(self): 1226 # Check problem occurring while unpickling a task in 1227 # the result_handler thread 1228 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) 1229 1230 def test_shutdown_deadlock(self): 1231 # Test that the pool calling shutdown do not cause deadlock 1232 # if a worker fails after the shutdown call. 1233 self.executor.shutdown(wait=True) 1234 with self.executor_type(max_workers=2, 1235 mp_context=get_context(self.ctx)) as executor: 1236 self.executor = executor # Allow clean up in fail_on_deadlock 1237 f = executor.submit(_crash, delay=.1) 1238 executor.shutdown(wait=True) 1239 with self.assertRaises(BrokenProcessPool): 1240 f.result() 1241 1242 def test_shutdown_deadlock_pickle(self): 1243 # Test that the pool calling shutdown with wait=False does not cause 1244 # a deadlock if a task fails at pickle after the shutdown call. 1245 # Reported in bpo-39104. 1246 self.executor.shutdown(wait=True) 1247 with self.executor_type(max_workers=2, 1248 mp_context=get_context(self.ctx)) as executor: 1249 self.executor = executor # Allow clean up in fail_on_deadlock 1250 1251 # Start the executor and get the executor_manager_thread to collect 1252 # the threads and avoid dangling thread that should be cleaned up 1253 # asynchronously. 1254 executor.submit(id, 42).result() 1255 executor_manager = executor._executor_manager_thread 1256 1257 # Submit a task that fails at pickle and shutdown the executor 1258 # without waiting 1259 f = executor.submit(id, ErrorAtPickle()) 1260 executor.shutdown(wait=False) 1261 with self.assertRaises(PicklingError): 1262 f.result() 1263 1264 # Make sure the executor is eventually shutdown and do not leave 1265 # dangling threads 1266 executor_manager.join() 1267 1268 1269create_executor_tests(ExecutorDeadlockTest, 1270 executor_mixins=(ProcessPoolForkMixin, 1271 ProcessPoolForkserverMixin, 1272 ProcessPoolSpawnMixin)) 1273 1274 1275class FutureTests(BaseTestCase): 1276 def test_done_callback_with_result(self): 1277 callback_result = None 1278 def fn(callback_future): 1279 nonlocal callback_result 1280 callback_result = callback_future.result() 1281 1282 f = Future() 1283 f.add_done_callback(fn) 1284 f.set_result(5) 1285 self.assertEqual(5, callback_result) 1286 1287 def test_done_callback_with_exception(self): 1288 callback_exception = None 1289 def fn(callback_future): 1290 nonlocal callback_exception 1291 callback_exception = callback_future.exception() 1292 1293 f = Future() 1294 f.add_done_callback(fn) 1295 f.set_exception(Exception('test')) 1296 self.assertEqual(('test',), callback_exception.args) 1297 1298 def test_done_callback_with_cancel(self): 1299 was_cancelled = None 1300 def fn(callback_future): 1301 nonlocal was_cancelled 1302 was_cancelled = callback_future.cancelled() 1303 1304 f = Future() 1305 f.add_done_callback(fn) 1306 self.assertTrue(f.cancel()) 1307 self.assertTrue(was_cancelled) 1308 1309 def test_done_callback_raises(self): 1310 with support.captured_stderr() as stderr: 1311 raising_was_called = False 1312 fn_was_called = False 1313 1314 def raising_fn(callback_future): 1315 nonlocal raising_was_called 1316 raising_was_called = True 1317 raise Exception('doh!') 1318 1319 def fn(callback_future): 1320 nonlocal fn_was_called 1321 fn_was_called = True 1322 1323 f = Future() 1324 f.add_done_callback(raising_fn) 1325 f.add_done_callback(fn) 1326 f.set_result(5) 1327 self.assertTrue(raising_was_called) 1328 self.assertTrue(fn_was_called) 1329 self.assertIn('Exception: doh!', stderr.getvalue()) 1330 1331 def test_done_callback_already_successful(self): 1332 callback_result = None 1333 def fn(callback_future): 1334 nonlocal callback_result 1335 callback_result = callback_future.result() 1336 1337 f = Future() 1338 f.set_result(5) 1339 f.add_done_callback(fn) 1340 self.assertEqual(5, callback_result) 1341 1342 def test_done_callback_already_failed(self): 1343 callback_exception = None 1344 def fn(callback_future): 1345 nonlocal callback_exception 1346 callback_exception = callback_future.exception() 1347 1348 f = Future() 1349 f.set_exception(Exception('test')) 1350 f.add_done_callback(fn) 1351 self.assertEqual(('test',), callback_exception.args) 1352 1353 def test_done_callback_already_cancelled(self): 1354 was_cancelled = None 1355 def fn(callback_future): 1356 nonlocal was_cancelled 1357 was_cancelled = callback_future.cancelled() 1358 1359 f = Future() 1360 self.assertTrue(f.cancel()) 1361 f.add_done_callback(fn) 1362 self.assertTrue(was_cancelled) 1363 1364 def test_done_callback_raises_already_succeeded(self): 1365 with support.captured_stderr() as stderr: 1366 def raising_fn(callback_future): 1367 raise Exception('doh!') 1368 1369 f = Future() 1370 1371 # Set the result first to simulate a future that runs instantly, 1372 # effectively allowing the callback to be run immediately. 1373 f.set_result(5) 1374 f.add_done_callback(raising_fn) 1375 1376 self.assertIn('exception calling callback for', stderr.getvalue()) 1377 self.assertIn('doh!', stderr.getvalue()) 1378 1379 1380 def test_repr(self): 1381 self.assertRegex(repr(PENDING_FUTURE), 1382 '<Future at 0x[0-9a-f]+ state=pending>') 1383 self.assertRegex(repr(RUNNING_FUTURE), 1384 '<Future at 0x[0-9a-f]+ state=running>') 1385 self.assertRegex(repr(CANCELLED_FUTURE), 1386 '<Future at 0x[0-9a-f]+ state=cancelled>') 1387 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 1388 '<Future at 0x[0-9a-f]+ state=cancelled>') 1389 self.assertRegex( 1390 repr(EXCEPTION_FUTURE), 1391 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 1392 self.assertRegex( 1393 repr(SUCCESSFUL_FUTURE), 1394 '<Future at 0x[0-9a-f]+ state=finished returned int>') 1395 1396 1397 def test_cancel(self): 1398 f1 = create_future(state=PENDING) 1399 f2 = create_future(state=RUNNING) 1400 f3 = create_future(state=CANCELLED) 1401 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 1402 f5 = create_future(state=FINISHED, exception=OSError()) 1403 f6 = create_future(state=FINISHED, result=5) 1404 1405 self.assertTrue(f1.cancel()) 1406 self.assertEqual(f1._state, CANCELLED) 1407 1408 self.assertFalse(f2.cancel()) 1409 self.assertEqual(f2._state, RUNNING) 1410 1411 self.assertTrue(f3.cancel()) 1412 self.assertEqual(f3._state, CANCELLED) 1413 1414 self.assertTrue(f4.cancel()) 1415 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 1416 1417 self.assertFalse(f5.cancel()) 1418 self.assertEqual(f5._state, FINISHED) 1419 1420 self.assertFalse(f6.cancel()) 1421 self.assertEqual(f6._state, FINISHED) 1422 1423 def test_cancelled(self): 1424 self.assertFalse(PENDING_FUTURE.cancelled()) 1425 self.assertFalse(RUNNING_FUTURE.cancelled()) 1426 self.assertTrue(CANCELLED_FUTURE.cancelled()) 1427 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 1428 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 1429 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 1430 1431 def test_done(self): 1432 self.assertFalse(PENDING_FUTURE.done()) 1433 self.assertFalse(RUNNING_FUTURE.done()) 1434 self.assertTrue(CANCELLED_FUTURE.done()) 1435 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 1436 self.assertTrue(EXCEPTION_FUTURE.done()) 1437 self.assertTrue(SUCCESSFUL_FUTURE.done()) 1438 1439 def test_running(self): 1440 self.assertFalse(PENDING_FUTURE.running()) 1441 self.assertTrue(RUNNING_FUTURE.running()) 1442 self.assertFalse(CANCELLED_FUTURE.running()) 1443 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 1444 self.assertFalse(EXCEPTION_FUTURE.running()) 1445 self.assertFalse(SUCCESSFUL_FUTURE.running()) 1446 1447 def test_result_with_timeout(self): 1448 self.assertRaises(futures.TimeoutError, 1449 PENDING_FUTURE.result, timeout=0) 1450 self.assertRaises(futures.TimeoutError, 1451 RUNNING_FUTURE.result, timeout=0) 1452 self.assertRaises(futures.CancelledError, 1453 CANCELLED_FUTURE.result, timeout=0) 1454 self.assertRaises(futures.CancelledError, 1455 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 1456 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 1457 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 1458 1459 def test_result_with_success(self): 1460 # TODO(brian@sweetapp.com): This test is timing dependent. 1461 def notification(): 1462 # Wait until the main thread is waiting for the result. 1463 time.sleep(1) 1464 f1.set_result(42) 1465 1466 f1 = create_future(state=PENDING) 1467 t = threading.Thread(target=notification) 1468 t.start() 1469 1470 self.assertEqual(f1.result(timeout=5), 42) 1471 t.join() 1472 1473 def test_result_with_cancel(self): 1474 # TODO(brian@sweetapp.com): This test is timing dependent. 1475 def notification(): 1476 # Wait until the main thread is waiting for the result. 1477 time.sleep(1) 1478 f1.cancel() 1479 1480 f1 = create_future(state=PENDING) 1481 t = threading.Thread(target=notification) 1482 t.start() 1483 1484 self.assertRaises(futures.CancelledError, 1485 f1.result, timeout=support.SHORT_TIMEOUT) 1486 t.join() 1487 1488 def test_exception_with_timeout(self): 1489 self.assertRaises(futures.TimeoutError, 1490 PENDING_FUTURE.exception, timeout=0) 1491 self.assertRaises(futures.TimeoutError, 1492 RUNNING_FUTURE.exception, timeout=0) 1493 self.assertRaises(futures.CancelledError, 1494 CANCELLED_FUTURE.exception, timeout=0) 1495 self.assertRaises(futures.CancelledError, 1496 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 1497 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 1498 OSError)) 1499 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 1500 1501 def test_exception_with_success(self): 1502 def notification(): 1503 # Wait until the main thread is waiting for the exception. 1504 time.sleep(1) 1505 with f1._condition: 1506 f1._state = FINISHED 1507 f1._exception = OSError() 1508 f1._condition.notify_all() 1509 1510 f1 = create_future(state=PENDING) 1511 t = threading.Thread(target=notification) 1512 t.start() 1513 1514 self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) 1515 t.join() 1516 1517 def test_multiple_set_result(self): 1518 f = create_future(state=PENDING) 1519 f.set_result(1) 1520 1521 with self.assertRaisesRegex( 1522 futures.InvalidStateError, 1523 'FINISHED: <Future at 0x[0-9a-f]+ ' 1524 'state=finished returned int>' 1525 ): 1526 f.set_result(2) 1527 1528 self.assertTrue(f.done()) 1529 self.assertEqual(f.result(), 1) 1530 1531 def test_multiple_set_exception(self): 1532 f = create_future(state=PENDING) 1533 e = ValueError() 1534 f.set_exception(e) 1535 1536 with self.assertRaisesRegex( 1537 futures.InvalidStateError, 1538 'FINISHED: <Future at 0x[0-9a-f]+ ' 1539 'state=finished raised ValueError>' 1540 ): 1541 f.set_exception(Exception()) 1542 1543 self.assertEqual(f.exception(), e) 1544 1545 1546def setUpModule(): 1547 unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) 1548 thread_info = threading_helper.threading_setup() 1549 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 1550 1551 1552if __name__ == "__main__": 1553 unittest.main() 1554