1from test import support 2from test.support import import_helper 3from test.support import threading_helper 4 5# Skip tests if _multiprocessing wasn't built. 6import_helper.import_module('_multiprocessing') 7 8from test.support import hashlib_helper 9from test.support.script_helper import assert_python_ok 10 11import contextlib 12import itertools 13import logging 14from logging.handlers import QueueHandler 15import os 16import queue 17import sys 18import threading 19import time 20import unittest 21import weakref 22from pickle import PicklingError 23 24from concurrent import futures 25from concurrent.futures._base import ( 26 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 27 BrokenExecutor) 28from concurrent.futures.process import BrokenProcessPool, _check_system_limits 29from multiprocessing import get_context 30 31import multiprocessing.process 32import multiprocessing.util 33 34 35def create_future(state=PENDING, exception=None, result=None): 36 f = Future() 37 f._state = state 38 f._exception = exception 39 f._result = result 40 return f 41 42 43PENDING_FUTURE = create_future(state=PENDING) 44RUNNING_FUTURE = create_future(state=RUNNING) 45CANCELLED_FUTURE = create_future(state=CANCELLED) 46CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 47EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 48SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 49 50INITIALIZER_STATUS = 'uninitialized' 51 52 53def mul(x, y): 54 return x * y 55 56def capture(*args, **kwargs): 57 return args, kwargs 58 59def sleep_and_raise(t): 60 time.sleep(t) 61 raise Exception('this is an exception') 62 63def sleep_and_print(t, msg): 64 time.sleep(t) 65 print(msg) 66 sys.stdout.flush() 67 68def init(x): 69 global INITIALIZER_STATUS 70 INITIALIZER_STATUS = x 71 72def get_init_status(): 73 return INITIALIZER_STATUS 74 75def init_fail(log_queue=None): 76 if log_queue is not None: 77 logger = logging.getLogger('concurrent.futures') 78 logger.addHandler(QueueHandler(log_queue)) 79 logger.setLevel('CRITICAL') 80 logger.propagate = False 81 time.sleep(0.1) # let some futures be scheduled 82 raise ValueError('error in initializer') 83 84 85class MyObject(object): 86 def my_method(self): 87 pass 88 89 90class EventfulGCObj(): 91 def __init__(self, mgr): 92 self.event = mgr.Event() 93 94 def __del__(self): 95 self.event.set() 96 97 98def make_dummy_object(_): 99 return MyObject() 100 101 102class BaseTestCase(unittest.TestCase): 103 def setUp(self): 104 self._thread_key = threading_helper.threading_setup() 105 106 def tearDown(self): 107 support.reap_children() 108 threading_helper.threading_cleanup(*self._thread_key) 109 110 111class ExecutorMixin: 112 worker_count = 5 113 executor_kwargs = {} 114 115 def setUp(self): 116 super().setUp() 117 118 self.t1 = time.monotonic() 119 if hasattr(self, "ctx"): 120 self.executor = self.executor_type( 121 max_workers=self.worker_count, 122 mp_context=self.get_context(), 123 **self.executor_kwargs) 124 else: 125 self.executor = self.executor_type( 126 max_workers=self.worker_count, 127 **self.executor_kwargs) 128 self._prime_executor() 129 130 def tearDown(self): 131 self.executor.shutdown(wait=True) 132 self.executor = None 133 134 dt = time.monotonic() - self.t1 135 if support.verbose: 136 print("%.2fs" % dt, end=' ') 137 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 138 139 super().tearDown() 140 141 def get_context(self): 142 return get_context(self.ctx) 143 144 def _prime_executor(self): 145 # Make sure that the executor is ready to do work before running the 146 # tests. This should reduce the probability of timeouts in the tests. 147 futures = [self.executor.submit(time.sleep, 0.1) 148 for _ in range(self.worker_count)] 149 for f in futures: 150 f.result() 151 152 153class ThreadPoolMixin(ExecutorMixin): 154 executor_type = futures.ThreadPoolExecutor 155 156 157class ProcessPoolForkMixin(ExecutorMixin): 158 executor_type = futures.ProcessPoolExecutor 159 ctx = "fork" 160 161 def get_context(self): 162 try: 163 _check_system_limits() 164 except NotImplementedError: 165 self.skipTest("ProcessPoolExecutor unavailable on this system") 166 if sys.platform == "win32": 167 self.skipTest("require unix system") 168 return super().get_context() 169 170 171class ProcessPoolSpawnMixin(ExecutorMixin): 172 executor_type = futures.ProcessPoolExecutor 173 ctx = "spawn" 174 175 def get_context(self): 176 try: 177 _check_system_limits() 178 except NotImplementedError: 179 self.skipTest("ProcessPoolExecutor unavailable on this system") 180 return super().get_context() 181 182 183class ProcessPoolForkserverMixin(ExecutorMixin): 184 executor_type = futures.ProcessPoolExecutor 185 ctx = "forkserver" 186 187 def get_context(self): 188 try: 189 _check_system_limits() 190 except NotImplementedError: 191 self.skipTest("ProcessPoolExecutor unavailable on this system") 192 if sys.platform == "win32": 193 self.skipTest("require unix system") 194 return super().get_context() 195 196 197def create_executor_tests(mixin, bases=(BaseTestCase,), 198 executor_mixins=(ThreadPoolMixin, 199 ProcessPoolForkMixin, 200 ProcessPoolForkserverMixin, 201 ProcessPoolSpawnMixin)): 202 def strip_mixin(name): 203 if name.endswith(('Mixin', 'Tests')): 204 return name[:-5] 205 elif name.endswith('Test'): 206 return name[:-4] 207 else: 208 return name 209 210 for exe in executor_mixins: 211 name = ("%s%sTest" 212 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 213 cls = type(name, (mixin,) + (exe,) + bases, {}) 214 globals()[name] = cls 215 216 217class InitializerMixin(ExecutorMixin): 218 worker_count = 2 219 220 def setUp(self): 221 global INITIALIZER_STATUS 222 INITIALIZER_STATUS = 'uninitialized' 223 self.executor_kwargs = dict(initializer=init, 224 initargs=('initialized',)) 225 super().setUp() 226 227 def test_initializer(self): 228 futures = [self.executor.submit(get_init_status) 229 for _ in range(self.worker_count)] 230 231 for f in futures: 232 self.assertEqual(f.result(), 'initialized') 233 234 235class FailingInitializerMixin(ExecutorMixin): 236 worker_count = 2 237 238 def setUp(self): 239 if hasattr(self, "ctx"): 240 # Pass a queue to redirect the child's logging output 241 self.mp_context = self.get_context() 242 self.log_queue = self.mp_context.Queue() 243 self.executor_kwargs = dict(initializer=init_fail, 244 initargs=(self.log_queue,)) 245 else: 246 # In a thread pool, the child shares our logging setup 247 # (see _assert_logged()) 248 self.mp_context = None 249 self.log_queue = None 250 self.executor_kwargs = dict(initializer=init_fail) 251 super().setUp() 252 253 def test_initializer(self): 254 with self._assert_logged('ValueError: error in initializer'): 255 try: 256 future = self.executor.submit(get_init_status) 257 except BrokenExecutor: 258 # Perhaps the executor is already broken 259 pass 260 else: 261 with self.assertRaises(BrokenExecutor): 262 future.result() 263 # At some point, the executor should break 264 t1 = time.monotonic() 265 while not self.executor._broken: 266 if time.monotonic() - t1 > 5: 267 self.fail("executor not broken after 5 s.") 268 time.sleep(0.01) 269 # ... and from this point submit() is guaranteed to fail 270 with self.assertRaises(BrokenExecutor): 271 self.executor.submit(get_init_status) 272 273 def _prime_executor(self): 274 pass 275 276 @contextlib.contextmanager 277 def _assert_logged(self, msg): 278 if self.log_queue is not None: 279 yield 280 output = [] 281 try: 282 while True: 283 output.append(self.log_queue.get_nowait().getMessage()) 284 except queue.Empty: 285 pass 286 else: 287 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 288 yield 289 output = cm.output 290 self.assertTrue(any(msg in line for line in output), 291 output) 292 293 294create_executor_tests(InitializerMixin) 295create_executor_tests(FailingInitializerMixin) 296 297 298class ExecutorShutdownTest: 299 def test_run_after_shutdown(self): 300 self.executor.shutdown() 301 self.assertRaises(RuntimeError, 302 self.executor.submit, 303 pow, 2, 5) 304 305 def test_interpreter_shutdown(self): 306 # Test the atexit hook for shutdown of worker threads and processes 307 rc, out, err = assert_python_ok('-c', """if 1: 308 from concurrent.futures import {executor_type} 309 from time import sleep 310 from test.test_concurrent_futures import sleep_and_print 311 if __name__ == "__main__": 312 context = '{context}' 313 if context == "": 314 t = {executor_type}(5) 315 else: 316 from multiprocessing import get_context 317 context = get_context(context) 318 t = {executor_type}(5, mp_context=context) 319 t.submit(sleep_and_print, 1.0, "apple") 320 """.format(executor_type=self.executor_type.__name__, 321 context=getattr(self, "ctx", ""))) 322 # Errors in atexit hooks don't change the process exit code, check 323 # stderr manually. 324 self.assertFalse(err) 325 self.assertEqual(out.strip(), b"apple") 326 327 def test_submit_after_interpreter_shutdown(self): 328 # Test the atexit hook for shutdown of worker threads and processes 329 rc, out, err = assert_python_ok('-c', """if 1: 330 import atexit 331 @atexit.register 332 def run_last(): 333 try: 334 t.submit(id, None) 335 except RuntimeError: 336 print("runtime-error") 337 raise 338 from concurrent.futures import {executor_type} 339 if __name__ == "__main__": 340 context = '{context}' 341 if not context: 342 t = {executor_type}(5) 343 else: 344 from multiprocessing import get_context 345 context = get_context(context) 346 t = {executor_type}(5, mp_context=context) 347 t.submit(id, 42).result() 348 """.format(executor_type=self.executor_type.__name__, 349 context=getattr(self, "ctx", ""))) 350 # Errors in atexit hooks don't change the process exit code, check 351 # stderr manually. 352 self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) 353 self.assertEqual(out.strip(), b"runtime-error") 354 355 def test_hang_issue12364(self): 356 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 357 self.executor.shutdown() 358 for f in fs: 359 f.result() 360 361 def test_cancel_futures(self): 362 executor = self.executor_type(max_workers=3) 363 fs = [executor.submit(time.sleep, .1) for _ in range(50)] 364 executor.shutdown(cancel_futures=True) 365 # We can't guarantee the exact number of cancellations, but we can 366 # guarantee that *some* were cancelled. With setting max_workers to 3, 367 # most of the submitted futures should have been cancelled. 368 cancelled = [fut for fut in fs if fut.cancelled()] 369 self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}") 370 371 # Ensure the other futures were able to finish. 372 # Use "not fut.cancelled()" instead of "fut.done()" to include futures 373 # that may have been left in a pending state. 374 others = [fut for fut in fs if not fut.cancelled()] 375 for fut in others: 376 self.assertTrue(fut.done(), msg=f"{fut._state=}") 377 self.assertIsNone(fut.exception()) 378 379 # Similar to the number of cancelled futures, we can't guarantee the 380 # exact number that completed. But, we can guarantee that at least 381 # one finished. 382 self.assertTrue(len(others) > 0, msg=f"{len(others)=}") 383 384 def test_hang_issue39205(self): 385 """shutdown(wait=False) doesn't hang at exit with running futures. 386 387 See https://bugs.python.org/issue39205. 388 """ 389 if self.executor_type == futures.ProcessPoolExecutor: 390 raise unittest.SkipTest( 391 "Hangs due to https://bugs.python.org/issue39205") 392 393 rc, out, err = assert_python_ok('-c', """if True: 394 from concurrent.futures import {executor_type} 395 from test.test_concurrent_futures import sleep_and_print 396 if __name__ == "__main__": 397 t = {executor_type}(max_workers=3) 398 t.submit(sleep_and_print, 1.0, "apple") 399 t.shutdown(wait=False) 400 """.format(executor_type=self.executor_type.__name__)) 401 self.assertFalse(err) 402 self.assertEqual(out.strip(), b"apple") 403 404 405class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): 406 def _prime_executor(self): 407 pass 408 409 def test_threads_terminate(self): 410 def acquire_lock(lock): 411 lock.acquire() 412 413 sem = threading.Semaphore(0) 414 for i in range(3): 415 self.executor.submit(acquire_lock, sem) 416 self.assertEqual(len(self.executor._threads), 3) 417 for i in range(3): 418 sem.release() 419 self.executor.shutdown() 420 for t in self.executor._threads: 421 t.join() 422 423 def test_context_manager_shutdown(self): 424 with futures.ThreadPoolExecutor(max_workers=5) as e: 425 executor = e 426 self.assertEqual(list(e.map(abs, range(-5, 5))), 427 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 428 429 for t in executor._threads: 430 t.join() 431 432 def test_del_shutdown(self): 433 executor = futures.ThreadPoolExecutor(max_workers=5) 434 res = executor.map(abs, range(-5, 5)) 435 threads = executor._threads 436 del executor 437 438 for t in threads: 439 t.join() 440 441 # Make sure the results were all computed before the 442 # executor got shutdown. 443 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 444 445 def test_shutdown_no_wait(self): 446 # Ensure that the executor cleans up the threads when calling 447 # shutdown with wait=False 448 executor = futures.ThreadPoolExecutor(max_workers=5) 449 res = executor.map(abs, range(-5, 5)) 450 threads = executor._threads 451 executor.shutdown(wait=False) 452 for t in threads: 453 t.join() 454 455 # Make sure the results were all computed before the 456 # executor got shutdown. 457 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 458 459 460 def test_thread_names_assigned(self): 461 executor = futures.ThreadPoolExecutor( 462 max_workers=5, thread_name_prefix='SpecialPool') 463 executor.map(abs, range(-5, 5)) 464 threads = executor._threads 465 del executor 466 support.gc_collect() # For PyPy or other GCs. 467 468 for t in threads: 469 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 470 t.join() 471 472 def test_thread_names_default(self): 473 executor = futures.ThreadPoolExecutor(max_workers=5) 474 executor.map(abs, range(-5, 5)) 475 threads = executor._threads 476 del executor 477 support.gc_collect() # For PyPy or other GCs. 478 479 for t in threads: 480 # Ensure that our default name is reasonably sane and unique when 481 # no thread_name_prefix was supplied. 482 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 483 t.join() 484 485 def test_cancel_futures_wait_false(self): 486 # Can only be reliably tested for TPE, since PPE often hangs with 487 # `wait=False` (even without *cancel_futures*). 488 rc, out, err = assert_python_ok('-c', """if True: 489 from concurrent.futures import ThreadPoolExecutor 490 from test.test_concurrent_futures import sleep_and_print 491 if __name__ == "__main__": 492 t = ThreadPoolExecutor() 493 t.submit(sleep_and_print, .1, "apple") 494 t.shutdown(wait=False, cancel_futures=True) 495 """.format(executor_type=self.executor_type.__name__)) 496 # Errors in atexit hooks don't change the process exit code, check 497 # stderr manually. 498 self.assertFalse(err) 499 self.assertEqual(out.strip(), b"apple") 500 501 502class ProcessPoolShutdownTest(ExecutorShutdownTest): 503 def _prime_executor(self): 504 pass 505 506 def test_processes_terminate(self): 507 def acquire_lock(lock): 508 lock.acquire() 509 510 mp_context = get_context() 511 sem = mp_context.Semaphore(0) 512 for _ in range(3): 513 self.executor.submit(acquire_lock, sem) 514 self.assertEqual(len(self.executor._processes), 3) 515 for _ in range(3): 516 sem.release() 517 processes = self.executor._processes 518 self.executor.shutdown() 519 520 for p in processes.values(): 521 p.join() 522 523 def test_context_manager_shutdown(self): 524 with futures.ProcessPoolExecutor(max_workers=5) as e: 525 processes = e._processes 526 self.assertEqual(list(e.map(abs, range(-5, 5))), 527 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 528 529 for p in processes.values(): 530 p.join() 531 532 def test_del_shutdown(self): 533 executor = futures.ProcessPoolExecutor(max_workers=5) 534 res = executor.map(abs, range(-5, 5)) 535 executor_manager_thread = executor._executor_manager_thread 536 processes = executor._processes 537 call_queue = executor._call_queue 538 executor_manager_thread = executor._executor_manager_thread 539 del executor 540 support.gc_collect() # For PyPy or other GCs. 541 542 # Make sure that all the executor resources were properly cleaned by 543 # the shutdown process 544 executor_manager_thread.join() 545 for p in processes.values(): 546 p.join() 547 call_queue.join_thread() 548 549 # Make sure the results were all computed before the 550 # executor got shutdown. 551 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 552 553 def test_shutdown_no_wait(self): 554 # Ensure that the executor cleans up the processes when calling 555 # shutdown with wait=False 556 executor = futures.ProcessPoolExecutor(max_workers=5) 557 res = executor.map(abs, range(-5, 5)) 558 processes = executor._processes 559 call_queue = executor._call_queue 560 executor_manager_thread = executor._executor_manager_thread 561 executor.shutdown(wait=False) 562 563 # Make sure that all the executor resources were properly cleaned by 564 # the shutdown process 565 executor_manager_thread.join() 566 for p in processes.values(): 567 p.join() 568 call_queue.join_thread() 569 570 # Make sure the results were all computed before the executor got 571 # shutdown. 572 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 573 574 575create_executor_tests(ProcessPoolShutdownTest, 576 executor_mixins=(ProcessPoolForkMixin, 577 ProcessPoolForkserverMixin, 578 ProcessPoolSpawnMixin)) 579 580 581class WaitTests: 582 def test_20369(self): 583 # See https://bugs.python.org/issue20369 584 future = self.executor.submit(time.sleep, 1.5) 585 done, not_done = futures.wait([future, future], 586 return_when=futures.ALL_COMPLETED) 587 self.assertEqual({future}, done) 588 self.assertEqual(set(), not_done) 589 590 591 def test_first_completed(self): 592 future1 = self.executor.submit(mul, 21, 2) 593 future2 = self.executor.submit(time.sleep, 1.5) 594 595 done, not_done = futures.wait( 596 [CANCELLED_FUTURE, future1, future2], 597 return_when=futures.FIRST_COMPLETED) 598 599 self.assertEqual(set([future1]), done) 600 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 601 602 def test_first_completed_some_already_completed(self): 603 future1 = self.executor.submit(time.sleep, 1.5) 604 605 finished, pending = futures.wait( 606 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 607 return_when=futures.FIRST_COMPLETED) 608 609 self.assertEqual( 610 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 611 finished) 612 self.assertEqual(set([future1]), pending) 613 614 def test_first_exception(self): 615 future1 = self.executor.submit(mul, 2, 21) 616 future2 = self.executor.submit(sleep_and_raise, 1.5) 617 future3 = self.executor.submit(time.sleep, 3) 618 619 finished, pending = futures.wait( 620 [future1, future2, future3], 621 return_when=futures.FIRST_EXCEPTION) 622 623 self.assertEqual(set([future1, future2]), finished) 624 self.assertEqual(set([future3]), pending) 625 626 def test_first_exception_some_already_complete(self): 627 future1 = self.executor.submit(divmod, 21, 0) 628 future2 = self.executor.submit(time.sleep, 1.5) 629 630 finished, pending = futures.wait( 631 [SUCCESSFUL_FUTURE, 632 CANCELLED_FUTURE, 633 CANCELLED_AND_NOTIFIED_FUTURE, 634 future1, future2], 635 return_when=futures.FIRST_EXCEPTION) 636 637 self.assertEqual(set([SUCCESSFUL_FUTURE, 638 CANCELLED_AND_NOTIFIED_FUTURE, 639 future1]), finished) 640 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 641 642 def test_first_exception_one_already_failed(self): 643 future1 = self.executor.submit(time.sleep, 2) 644 645 finished, pending = futures.wait( 646 [EXCEPTION_FUTURE, future1], 647 return_when=futures.FIRST_EXCEPTION) 648 649 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 650 self.assertEqual(set([future1]), pending) 651 652 def test_all_completed(self): 653 future1 = self.executor.submit(divmod, 2, 0) 654 future2 = self.executor.submit(mul, 2, 21) 655 656 finished, pending = futures.wait( 657 [SUCCESSFUL_FUTURE, 658 CANCELLED_AND_NOTIFIED_FUTURE, 659 EXCEPTION_FUTURE, 660 future1, 661 future2], 662 return_when=futures.ALL_COMPLETED) 663 664 self.assertEqual(set([SUCCESSFUL_FUTURE, 665 CANCELLED_AND_NOTIFIED_FUTURE, 666 EXCEPTION_FUTURE, 667 future1, 668 future2]), finished) 669 self.assertEqual(set(), pending) 670 671 def test_timeout(self): 672 future1 = self.executor.submit(mul, 6, 7) 673 future2 = self.executor.submit(time.sleep, 6) 674 675 finished, pending = futures.wait( 676 [CANCELLED_AND_NOTIFIED_FUTURE, 677 EXCEPTION_FUTURE, 678 SUCCESSFUL_FUTURE, 679 future1, future2], 680 timeout=5, 681 return_when=futures.ALL_COMPLETED) 682 683 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 684 EXCEPTION_FUTURE, 685 SUCCESSFUL_FUTURE, 686 future1]), finished) 687 self.assertEqual(set([future2]), pending) 688 689 690class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): 691 692 def test_pending_calls_race(self): 693 # Issue #14406: multi-threaded race condition when waiting on all 694 # futures. 695 event = threading.Event() 696 def future_func(): 697 event.wait() 698 oldswitchinterval = sys.getswitchinterval() 699 sys.setswitchinterval(1e-6) 700 try: 701 fs = {self.executor.submit(future_func) for i in range(100)} 702 event.set() 703 futures.wait(fs, return_when=futures.ALL_COMPLETED) 704 finally: 705 sys.setswitchinterval(oldswitchinterval) 706 707 708create_executor_tests(WaitTests, 709 executor_mixins=(ProcessPoolForkMixin, 710 ProcessPoolForkserverMixin, 711 ProcessPoolSpawnMixin)) 712 713 714class AsCompletedTests: 715 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 716 def test_no_timeout(self): 717 future1 = self.executor.submit(mul, 2, 21) 718 future2 = self.executor.submit(mul, 7, 6) 719 720 completed = set(futures.as_completed( 721 [CANCELLED_AND_NOTIFIED_FUTURE, 722 EXCEPTION_FUTURE, 723 SUCCESSFUL_FUTURE, 724 future1, future2])) 725 self.assertEqual(set( 726 [CANCELLED_AND_NOTIFIED_FUTURE, 727 EXCEPTION_FUTURE, 728 SUCCESSFUL_FUTURE, 729 future1, future2]), 730 completed) 731 732 def test_zero_timeout(self): 733 future1 = self.executor.submit(time.sleep, 2) 734 completed_futures = set() 735 try: 736 for future in futures.as_completed( 737 [CANCELLED_AND_NOTIFIED_FUTURE, 738 EXCEPTION_FUTURE, 739 SUCCESSFUL_FUTURE, 740 future1], 741 timeout=0): 742 completed_futures.add(future) 743 except futures.TimeoutError: 744 pass 745 746 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 747 EXCEPTION_FUTURE, 748 SUCCESSFUL_FUTURE]), 749 completed_futures) 750 751 def test_duplicate_futures(self): 752 # Issue 20367. Duplicate futures should not raise exceptions or give 753 # duplicate responses. 754 # Issue #31641: accept arbitrary iterables. 755 future1 = self.executor.submit(time.sleep, 2) 756 completed = [ 757 f for f in futures.as_completed(itertools.repeat(future1, 3)) 758 ] 759 self.assertEqual(len(completed), 1) 760 761 def test_free_reference_yielded_future(self): 762 # Issue #14406: Generator should not keep references 763 # to finished futures. 764 futures_list = [Future() for _ in range(8)] 765 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 766 futures_list.append(create_future(state=FINISHED, result=42)) 767 768 with self.assertRaises(futures.TimeoutError): 769 for future in futures.as_completed(futures_list, timeout=0): 770 futures_list.remove(future) 771 wr = weakref.ref(future) 772 del future 773 support.gc_collect() # For PyPy or other GCs. 774 self.assertIsNone(wr()) 775 776 futures_list[0].set_result("test") 777 for future in futures.as_completed(futures_list): 778 futures_list.remove(future) 779 wr = weakref.ref(future) 780 del future 781 support.gc_collect() # For PyPy or other GCs. 782 self.assertIsNone(wr()) 783 if futures_list: 784 futures_list[0].set_result("test") 785 786 def test_correct_timeout_exception_msg(self): 787 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 788 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 789 790 with self.assertRaises(futures.TimeoutError) as cm: 791 list(futures.as_completed(futures_list, timeout=0)) 792 793 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 794 795 796create_executor_tests(AsCompletedTests) 797 798 799class ExecutorTest: 800 # Executor.shutdown() and context manager usage is tested by 801 # ExecutorShutdownTest. 802 def test_submit(self): 803 future = self.executor.submit(pow, 2, 8) 804 self.assertEqual(256, future.result()) 805 806 def test_submit_keyword(self): 807 future = self.executor.submit(mul, 2, y=8) 808 self.assertEqual(16, future.result()) 809 future = self.executor.submit(capture, 1, self=2, fn=3) 810 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) 811 with self.assertRaises(TypeError): 812 self.executor.submit(fn=capture, arg=1) 813 with self.assertRaises(TypeError): 814 self.executor.submit(arg=1) 815 816 def test_map(self): 817 self.assertEqual( 818 list(self.executor.map(pow, range(10), range(10))), 819 list(map(pow, range(10), range(10)))) 820 821 self.assertEqual( 822 list(self.executor.map(pow, range(10), range(10), chunksize=3)), 823 list(map(pow, range(10), range(10)))) 824 825 def test_map_exception(self): 826 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 827 self.assertEqual(i.__next__(), (0, 1)) 828 self.assertEqual(i.__next__(), (0, 1)) 829 self.assertRaises(ZeroDivisionError, i.__next__) 830 831 def test_map_timeout(self): 832 results = [] 833 try: 834 for i in self.executor.map(time.sleep, 835 [0, 0, 6], 836 timeout=5): 837 results.append(i) 838 except futures.TimeoutError: 839 pass 840 else: 841 self.fail('expected TimeoutError') 842 843 self.assertEqual([None, None], results) 844 845 def test_shutdown_race_issue12456(self): 846 # Issue #12456: race condition at shutdown where trying to post a 847 # sentinel in the call queue blocks (the queue is full while processes 848 # have exited). 849 self.executor.map(str, [2] * (self.worker_count + 1)) 850 self.executor.shutdown() 851 852 @support.cpython_only 853 def test_no_stale_references(self): 854 # Issue #16284: check that the executors don't unnecessarily hang onto 855 # references. 856 my_object = MyObject() 857 my_object_collected = threading.Event() 858 my_object_callback = weakref.ref( 859 my_object, lambda obj: my_object_collected.set()) 860 # Deliberately discarding the future. 861 self.executor.submit(my_object.my_method) 862 del my_object 863 864 collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) 865 self.assertTrue(collected, 866 "Stale reference not collected within timeout.") 867 868 def test_max_workers_negative(self): 869 for number in (0, -1): 870 with self.assertRaisesRegex(ValueError, 871 "max_workers must be greater " 872 "than 0"): 873 self.executor_type(max_workers=number) 874 875 def test_free_reference(self): 876 # Issue #14406: Result iterator should not keep an internal 877 # reference to result objects. 878 for obj in self.executor.map(make_dummy_object, range(10)): 879 wr = weakref.ref(obj) 880 del obj 881 support.gc_collect() # For PyPy or other GCs. 882 self.assertIsNone(wr()) 883 884 885class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): 886 def test_map_submits_without_iteration(self): 887 """Tests verifying issue 11777.""" 888 finished = [] 889 def record_finished(n): 890 finished.append(n) 891 892 self.executor.map(record_finished, range(10)) 893 self.executor.shutdown(wait=True) 894 self.assertCountEqual(finished, range(10)) 895 896 def test_default_workers(self): 897 executor = self.executor_type() 898 expected = min(32, (os.cpu_count() or 1) + 4) 899 self.assertEqual(executor._max_workers, expected) 900 901 def test_saturation(self): 902 executor = self.executor_type(4) 903 def acquire_lock(lock): 904 lock.acquire() 905 906 sem = threading.Semaphore(0) 907 for i in range(15 * executor._max_workers): 908 executor.submit(acquire_lock, sem) 909 self.assertEqual(len(executor._threads), executor._max_workers) 910 for i in range(15 * executor._max_workers): 911 sem.release() 912 executor.shutdown(wait=True) 913 914 def test_idle_thread_reuse(self): 915 executor = self.executor_type() 916 executor.submit(mul, 21, 2).result() 917 executor.submit(mul, 6, 7).result() 918 executor.submit(mul, 3, 14).result() 919 self.assertEqual(len(executor._threads), 1) 920 executor.shutdown(wait=True) 921 922 @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') 923 def test_hang_global_shutdown_lock(self): 924 # bpo-45021: _global_shutdown_lock should be reinitialized in the child 925 # process, otherwise it will never exit 926 def submit(pool): 927 pool.submit(submit, pool) 928 929 with futures.ThreadPoolExecutor(1) as pool: 930 pool.submit(submit, pool) 931 932 for _ in range(50): 933 with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers: 934 workers.submit(tuple) 935 936 937class ProcessPoolExecutorTest(ExecutorTest): 938 939 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') 940 def test_max_workers_too_large(self): 941 with self.assertRaisesRegex(ValueError, 942 "max_workers must be <= 61"): 943 futures.ProcessPoolExecutor(max_workers=62) 944 945 def test_killed_child(self): 946 # When a child process is abruptly terminated, the whole pool gets 947 # "broken". 948 futures = [self.executor.submit(time.sleep, 3)] 949 # Get one of the processes, and terminate (kill) it 950 p = next(iter(self.executor._processes.values())) 951 p.terminate() 952 for fut in futures: 953 self.assertRaises(BrokenProcessPool, fut.result) 954 # Submitting other jobs fails as well. 955 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 956 957 def test_map_chunksize(self): 958 def bad_map(): 959 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 960 961 ref = list(map(pow, range(40), range(40))) 962 self.assertEqual( 963 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 964 ref) 965 self.assertEqual( 966 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 967 ref) 968 self.assertEqual( 969 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 970 ref) 971 self.assertRaises(ValueError, bad_map) 972 973 @classmethod 974 def _test_traceback(cls): 975 raise RuntimeError(123) # some comment 976 977 def test_traceback(self): 978 # We want ensure that the traceback from the child process is 979 # contained in the traceback raised in the main process. 980 future = self.executor.submit(self._test_traceback) 981 with self.assertRaises(Exception) as cm: 982 future.result() 983 984 exc = cm.exception 985 self.assertIs(type(exc), RuntimeError) 986 self.assertEqual(exc.args, (123,)) 987 cause = exc.__cause__ 988 self.assertIs(type(cause), futures.process._RemoteTraceback) 989 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 990 991 with support.captured_stderr() as f1: 992 try: 993 raise exc 994 except RuntimeError: 995 sys.excepthook(*sys.exc_info()) 996 self.assertIn('raise RuntimeError(123) # some comment', 997 f1.getvalue()) 998 999 @hashlib_helper.requires_hashdigest('md5') 1000 def test_ressources_gced_in_workers(self): 1001 # Ensure that argument for a job are correctly gc-ed after the job 1002 # is finished 1003 mgr = get_context(self.ctx).Manager() 1004 obj = EventfulGCObj(mgr) 1005 future = self.executor.submit(id, obj) 1006 future.result() 1007 1008 self.assertTrue(obj.event.wait(timeout=1)) 1009 1010 # explicitly destroy the object to ensure that EventfulGCObj.__del__() 1011 # is called while manager is still running. 1012 obj = None 1013 support.gc_collect() 1014 1015 mgr.shutdown() 1016 mgr.join() 1017 1018 def test_saturation(self): 1019 executor = self.executor_type(4) 1020 mp_context = get_context() 1021 sem = mp_context.Semaphore(0) 1022 job_count = 15 * executor._max_workers 1023 try: 1024 for _ in range(job_count): 1025 executor.submit(sem.acquire) 1026 self.assertEqual(len(executor._processes), executor._max_workers) 1027 for _ in range(job_count): 1028 sem.release() 1029 finally: 1030 executor.shutdown() 1031 1032 def test_idle_process_reuse_one(self): 1033 executor = self.executor_type(4) 1034 executor.submit(mul, 21, 2).result() 1035 executor.submit(mul, 6, 7).result() 1036 executor.submit(mul, 3, 14).result() 1037 self.assertEqual(len(executor._processes), 1) 1038 executor.shutdown() 1039 1040 def test_idle_process_reuse_multiple(self): 1041 executor = self.executor_type(4) 1042 executor.submit(mul, 12, 7).result() 1043 executor.submit(mul, 33, 25) 1044 executor.submit(mul, 25, 26).result() 1045 executor.submit(mul, 18, 29) 1046 self.assertLessEqual(len(executor._processes), 2) 1047 executor.shutdown() 1048 1049create_executor_tests(ProcessPoolExecutorTest, 1050 executor_mixins=(ProcessPoolForkMixin, 1051 ProcessPoolForkserverMixin, 1052 ProcessPoolSpawnMixin)) 1053 1054def _crash(delay=None): 1055 """Induces a segfault.""" 1056 if delay: 1057 time.sleep(delay) 1058 import faulthandler 1059 faulthandler.disable() 1060 faulthandler._sigsegv() 1061 1062 1063def _exit(): 1064 """Induces a sys exit with exitcode 1.""" 1065 sys.exit(1) 1066 1067 1068def _raise_error(Err): 1069 """Function that raises an Exception in process.""" 1070 raise Err() 1071 1072 1073def _raise_error_ignore_stderr(Err): 1074 """Function that raises an Exception in process and ignores stderr.""" 1075 import io 1076 sys.stderr = io.StringIO() 1077 raise Err() 1078 1079 1080def _return_instance(cls): 1081 """Function that returns a instance of cls.""" 1082 return cls() 1083 1084 1085class CrashAtPickle(object): 1086 """Bad object that triggers a segfault at pickling time.""" 1087 def __reduce__(self): 1088 _crash() 1089 1090 1091class CrashAtUnpickle(object): 1092 """Bad object that triggers a segfault at unpickling time.""" 1093 def __reduce__(self): 1094 return _crash, () 1095 1096 1097class ExitAtPickle(object): 1098 """Bad object that triggers a process exit at pickling time.""" 1099 def __reduce__(self): 1100 _exit() 1101 1102 1103class ExitAtUnpickle(object): 1104 """Bad object that triggers a process exit at unpickling time.""" 1105 def __reduce__(self): 1106 return _exit, () 1107 1108 1109class ErrorAtPickle(object): 1110 """Bad object that triggers an error at pickling time.""" 1111 def __reduce__(self): 1112 from pickle import PicklingError 1113 raise PicklingError("Error in pickle") 1114 1115 1116class ErrorAtUnpickle(object): 1117 """Bad object that triggers an error at unpickling time.""" 1118 def __reduce__(self): 1119 from pickle import UnpicklingError 1120 return _raise_error_ignore_stderr, (UnpicklingError, ) 1121 1122 1123class ExecutorDeadlockTest: 1124 TIMEOUT = support.SHORT_TIMEOUT 1125 1126 def _fail_on_deadlock(self, executor): 1127 # If we did not recover before TIMEOUT seconds, consider that the 1128 # executor is in a deadlock state and forcefully clean all its 1129 # composants. 1130 import faulthandler 1131 from tempfile import TemporaryFile 1132 with TemporaryFile(mode="w+") as f: 1133 faulthandler.dump_traceback(file=f) 1134 f.seek(0) 1135 tb = f.read() 1136 for p in executor._processes.values(): 1137 p.terminate() 1138 # This should be safe to call executor.shutdown here as all possible 1139 # deadlocks should have been broken. 1140 executor.shutdown(wait=True) 1141 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 1142 self.fail(f"Executor deadlock:\n\n{tb}") 1143 1144 1145 def _check_crash(self, error, func, *args, ignore_stderr=False): 1146 # test for deadlock caused by crashes in a pool 1147 self.executor.shutdown(wait=True) 1148 1149 executor = self.executor_type( 1150 max_workers=2, mp_context=get_context(self.ctx)) 1151 res = executor.submit(func, *args) 1152 1153 if ignore_stderr: 1154 cm = support.captured_stderr() 1155 else: 1156 cm = contextlib.nullcontext() 1157 1158 try: 1159 with self.assertRaises(error): 1160 with cm: 1161 res.result(timeout=self.TIMEOUT) 1162 except futures.TimeoutError: 1163 # If we did not recover before TIMEOUT seconds, 1164 # consider that the executor is in a deadlock state 1165 self._fail_on_deadlock(executor) 1166 executor.shutdown(wait=True) 1167 1168 def test_error_at_task_pickle(self): 1169 # Check problem occurring while pickling a task in 1170 # the task_handler thread 1171 self._check_crash(PicklingError, id, ErrorAtPickle()) 1172 1173 def test_exit_at_task_unpickle(self): 1174 # Check problem occurring while unpickling a task on workers 1175 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) 1176 1177 def test_error_at_task_unpickle(self): 1178 # Check problem occurring while unpickling a task on workers 1179 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) 1180 1181 def test_crash_at_task_unpickle(self): 1182 # Check problem occurring while unpickling a task on workers 1183 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) 1184 1185 def test_crash_during_func_exec_on_worker(self): 1186 # Check problem occurring during func execution on workers 1187 self._check_crash(BrokenProcessPool, _crash) 1188 1189 def test_exit_during_func_exec_on_worker(self): 1190 # Check problem occurring during func execution on workers 1191 self._check_crash(SystemExit, _exit) 1192 1193 def test_error_during_func_exec_on_worker(self): 1194 # Check problem occurring during func execution on workers 1195 self._check_crash(RuntimeError, _raise_error, RuntimeError) 1196 1197 def test_crash_during_result_pickle_on_worker(self): 1198 # Check problem occurring while pickling a task result 1199 # on workers 1200 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) 1201 1202 def test_exit_during_result_pickle_on_worker(self): 1203 # Check problem occurring while pickling a task result 1204 # on workers 1205 self._check_crash(SystemExit, _return_instance, ExitAtPickle) 1206 1207 def test_error_during_result_pickle_on_worker(self): 1208 # Check problem occurring while pickling a task result 1209 # on workers 1210 self._check_crash(PicklingError, _return_instance, ErrorAtPickle) 1211 1212 def test_error_during_result_unpickle_in_result_handler(self): 1213 # Check problem occurring while unpickling a task in 1214 # the result_handler thread 1215 self._check_crash(BrokenProcessPool, 1216 _return_instance, ErrorAtUnpickle, 1217 ignore_stderr=True) 1218 1219 def test_exit_during_result_unpickle_in_result_handler(self): 1220 # Check problem occurring while unpickling a task in 1221 # the result_handler thread 1222 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) 1223 1224 def test_shutdown_deadlock(self): 1225 # Test that the pool calling shutdown do not cause deadlock 1226 # if a worker fails after the shutdown call. 1227 self.executor.shutdown(wait=True) 1228 with self.executor_type(max_workers=2, 1229 mp_context=get_context(self.ctx)) as executor: 1230 self.executor = executor # Allow clean up in fail_on_deadlock 1231 f = executor.submit(_crash, delay=.1) 1232 executor.shutdown(wait=True) 1233 with self.assertRaises(BrokenProcessPool): 1234 f.result() 1235 1236 def test_shutdown_deadlock_pickle(self): 1237 # Test that the pool calling shutdown with wait=False does not cause 1238 # a deadlock if a task fails at pickle after the shutdown call. 1239 # Reported in bpo-39104. 1240 self.executor.shutdown(wait=True) 1241 with self.executor_type(max_workers=2, 1242 mp_context=get_context(self.ctx)) as executor: 1243 self.executor = executor # Allow clean up in fail_on_deadlock 1244 1245 # Start the executor and get the executor_manager_thread to collect 1246 # the threads and avoid dangling thread that should be cleaned up 1247 # asynchronously. 1248 executor.submit(id, 42).result() 1249 executor_manager = executor._executor_manager_thread 1250 1251 # Submit a task that fails at pickle and shutdown the executor 1252 # without waiting 1253 f = executor.submit(id, ErrorAtPickle()) 1254 executor.shutdown(wait=False) 1255 with self.assertRaises(PicklingError): 1256 f.result() 1257 1258 # Make sure the executor is eventually shutdown and do not leave 1259 # dangling threads 1260 executor_manager.join() 1261 1262 1263create_executor_tests(ExecutorDeadlockTest, 1264 executor_mixins=(ProcessPoolForkMixin, 1265 ProcessPoolForkserverMixin, 1266 ProcessPoolSpawnMixin)) 1267 1268 1269class FutureTests(BaseTestCase): 1270 def test_done_callback_with_result(self): 1271 callback_result = None 1272 def fn(callback_future): 1273 nonlocal callback_result 1274 callback_result = callback_future.result() 1275 1276 f = Future() 1277 f.add_done_callback(fn) 1278 f.set_result(5) 1279 self.assertEqual(5, callback_result) 1280 1281 def test_done_callback_with_exception(self): 1282 callback_exception = None 1283 def fn(callback_future): 1284 nonlocal callback_exception 1285 callback_exception = callback_future.exception() 1286 1287 f = Future() 1288 f.add_done_callback(fn) 1289 f.set_exception(Exception('test')) 1290 self.assertEqual(('test',), callback_exception.args) 1291 1292 def test_done_callback_with_cancel(self): 1293 was_cancelled = None 1294 def fn(callback_future): 1295 nonlocal was_cancelled 1296 was_cancelled = callback_future.cancelled() 1297 1298 f = Future() 1299 f.add_done_callback(fn) 1300 self.assertTrue(f.cancel()) 1301 self.assertTrue(was_cancelled) 1302 1303 def test_done_callback_raises(self): 1304 with support.captured_stderr() as stderr: 1305 raising_was_called = False 1306 fn_was_called = False 1307 1308 def raising_fn(callback_future): 1309 nonlocal raising_was_called 1310 raising_was_called = True 1311 raise Exception('doh!') 1312 1313 def fn(callback_future): 1314 nonlocal fn_was_called 1315 fn_was_called = True 1316 1317 f = Future() 1318 f.add_done_callback(raising_fn) 1319 f.add_done_callback(fn) 1320 f.set_result(5) 1321 self.assertTrue(raising_was_called) 1322 self.assertTrue(fn_was_called) 1323 self.assertIn('Exception: doh!', stderr.getvalue()) 1324 1325 def test_done_callback_already_successful(self): 1326 callback_result = None 1327 def fn(callback_future): 1328 nonlocal callback_result 1329 callback_result = callback_future.result() 1330 1331 f = Future() 1332 f.set_result(5) 1333 f.add_done_callback(fn) 1334 self.assertEqual(5, callback_result) 1335 1336 def test_done_callback_already_failed(self): 1337 callback_exception = None 1338 def fn(callback_future): 1339 nonlocal callback_exception 1340 callback_exception = callback_future.exception() 1341 1342 f = Future() 1343 f.set_exception(Exception('test')) 1344 f.add_done_callback(fn) 1345 self.assertEqual(('test',), callback_exception.args) 1346 1347 def test_done_callback_already_cancelled(self): 1348 was_cancelled = None 1349 def fn(callback_future): 1350 nonlocal was_cancelled 1351 was_cancelled = callback_future.cancelled() 1352 1353 f = Future() 1354 self.assertTrue(f.cancel()) 1355 f.add_done_callback(fn) 1356 self.assertTrue(was_cancelled) 1357 1358 def test_done_callback_raises_already_succeeded(self): 1359 with support.captured_stderr() as stderr: 1360 def raising_fn(callback_future): 1361 raise Exception('doh!') 1362 1363 f = Future() 1364 1365 # Set the result first to simulate a future that runs instantly, 1366 # effectively allowing the callback to be run immediately. 1367 f.set_result(5) 1368 f.add_done_callback(raising_fn) 1369 1370 self.assertIn('exception calling callback for', stderr.getvalue()) 1371 self.assertIn('doh!', stderr.getvalue()) 1372 1373 1374 def test_repr(self): 1375 self.assertRegex(repr(PENDING_FUTURE), 1376 '<Future at 0x[0-9a-f]+ state=pending>') 1377 self.assertRegex(repr(RUNNING_FUTURE), 1378 '<Future at 0x[0-9a-f]+ state=running>') 1379 self.assertRegex(repr(CANCELLED_FUTURE), 1380 '<Future at 0x[0-9a-f]+ state=cancelled>') 1381 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 1382 '<Future at 0x[0-9a-f]+ state=cancelled>') 1383 self.assertRegex( 1384 repr(EXCEPTION_FUTURE), 1385 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 1386 self.assertRegex( 1387 repr(SUCCESSFUL_FUTURE), 1388 '<Future at 0x[0-9a-f]+ state=finished returned int>') 1389 1390 1391 def test_cancel(self): 1392 f1 = create_future(state=PENDING) 1393 f2 = create_future(state=RUNNING) 1394 f3 = create_future(state=CANCELLED) 1395 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 1396 f5 = create_future(state=FINISHED, exception=OSError()) 1397 f6 = create_future(state=FINISHED, result=5) 1398 1399 self.assertTrue(f1.cancel()) 1400 self.assertEqual(f1._state, CANCELLED) 1401 1402 self.assertFalse(f2.cancel()) 1403 self.assertEqual(f2._state, RUNNING) 1404 1405 self.assertTrue(f3.cancel()) 1406 self.assertEqual(f3._state, CANCELLED) 1407 1408 self.assertTrue(f4.cancel()) 1409 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 1410 1411 self.assertFalse(f5.cancel()) 1412 self.assertEqual(f5._state, FINISHED) 1413 1414 self.assertFalse(f6.cancel()) 1415 self.assertEqual(f6._state, FINISHED) 1416 1417 def test_cancelled(self): 1418 self.assertFalse(PENDING_FUTURE.cancelled()) 1419 self.assertFalse(RUNNING_FUTURE.cancelled()) 1420 self.assertTrue(CANCELLED_FUTURE.cancelled()) 1421 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 1422 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 1423 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 1424 1425 def test_done(self): 1426 self.assertFalse(PENDING_FUTURE.done()) 1427 self.assertFalse(RUNNING_FUTURE.done()) 1428 self.assertTrue(CANCELLED_FUTURE.done()) 1429 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 1430 self.assertTrue(EXCEPTION_FUTURE.done()) 1431 self.assertTrue(SUCCESSFUL_FUTURE.done()) 1432 1433 def test_running(self): 1434 self.assertFalse(PENDING_FUTURE.running()) 1435 self.assertTrue(RUNNING_FUTURE.running()) 1436 self.assertFalse(CANCELLED_FUTURE.running()) 1437 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 1438 self.assertFalse(EXCEPTION_FUTURE.running()) 1439 self.assertFalse(SUCCESSFUL_FUTURE.running()) 1440 1441 def test_result_with_timeout(self): 1442 self.assertRaises(futures.TimeoutError, 1443 PENDING_FUTURE.result, timeout=0) 1444 self.assertRaises(futures.TimeoutError, 1445 RUNNING_FUTURE.result, timeout=0) 1446 self.assertRaises(futures.CancelledError, 1447 CANCELLED_FUTURE.result, timeout=0) 1448 self.assertRaises(futures.CancelledError, 1449 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 1450 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 1451 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 1452 1453 def test_result_with_success(self): 1454 # TODO(brian@sweetapp.com): This test is timing dependent. 1455 def notification(): 1456 # Wait until the main thread is waiting for the result. 1457 time.sleep(1) 1458 f1.set_result(42) 1459 1460 f1 = create_future(state=PENDING) 1461 t = threading.Thread(target=notification) 1462 t.start() 1463 1464 self.assertEqual(f1.result(timeout=5), 42) 1465 t.join() 1466 1467 def test_result_with_cancel(self): 1468 # TODO(brian@sweetapp.com): This test is timing dependent. 1469 def notification(): 1470 # Wait until the main thread is waiting for the result. 1471 time.sleep(1) 1472 f1.cancel() 1473 1474 f1 = create_future(state=PENDING) 1475 t = threading.Thread(target=notification) 1476 t.start() 1477 1478 self.assertRaises(futures.CancelledError, 1479 f1.result, timeout=support.SHORT_TIMEOUT) 1480 t.join() 1481 1482 def test_exception_with_timeout(self): 1483 self.assertRaises(futures.TimeoutError, 1484 PENDING_FUTURE.exception, timeout=0) 1485 self.assertRaises(futures.TimeoutError, 1486 RUNNING_FUTURE.exception, timeout=0) 1487 self.assertRaises(futures.CancelledError, 1488 CANCELLED_FUTURE.exception, timeout=0) 1489 self.assertRaises(futures.CancelledError, 1490 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 1491 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 1492 OSError)) 1493 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 1494 1495 def test_exception_with_success(self): 1496 def notification(): 1497 # Wait until the main thread is waiting for the exception. 1498 time.sleep(1) 1499 with f1._condition: 1500 f1._state = FINISHED 1501 f1._exception = OSError() 1502 f1._condition.notify_all() 1503 1504 f1 = create_future(state=PENDING) 1505 t = threading.Thread(target=notification) 1506 t.start() 1507 1508 self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) 1509 t.join() 1510 1511 def test_multiple_set_result(self): 1512 f = create_future(state=PENDING) 1513 f.set_result(1) 1514 1515 with self.assertRaisesRegex( 1516 futures.InvalidStateError, 1517 'FINISHED: <Future at 0x[0-9a-f]+ ' 1518 'state=finished returned int>' 1519 ): 1520 f.set_result(2) 1521 1522 self.assertTrue(f.done()) 1523 self.assertEqual(f.result(), 1) 1524 1525 def test_multiple_set_exception(self): 1526 f = create_future(state=PENDING) 1527 e = ValueError() 1528 f.set_exception(e) 1529 1530 with self.assertRaisesRegex( 1531 futures.InvalidStateError, 1532 'FINISHED: <Future at 0x[0-9a-f]+ ' 1533 'state=finished raised ValueError>' 1534 ): 1535 f.set_exception(Exception()) 1536 1537 self.assertEqual(f.exception(), e) 1538 1539 1540def setUpModule(): 1541 unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) 1542 thread_info = threading_helper.threading_setup() 1543 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 1544 1545 1546if __name__ == "__main__": 1547 unittest.main() 1548