1import test.support 2 3# Skip tests if _multiprocessing wasn't built. 4test.support.import_module('_multiprocessing') 5# Skip tests if sem_open implementation is broken. 6test.support.import_module('multiprocessing.synchronize') 7 8from test.support.script_helper import assert_python_ok 9 10import contextlib 11import itertools 12import logging 13from logging.handlers import QueueHandler 14import os 15import queue 16import sys 17import threading 18import time 19import unittest 20import weakref 21from pickle import PicklingError 22 23from concurrent import futures 24from concurrent.futures._base import ( 25 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 26 BrokenExecutor) 27from concurrent.futures.process import BrokenProcessPool 28from multiprocessing import get_context 29 30import multiprocessing.process 31import multiprocessing.util 32 33 34def create_future(state=PENDING, exception=None, result=None): 35 f = Future() 36 f._state = state 37 f._exception = exception 38 f._result = result 39 return f 40 41 42PENDING_FUTURE = create_future(state=PENDING) 43RUNNING_FUTURE = create_future(state=RUNNING) 44CANCELLED_FUTURE = create_future(state=CANCELLED) 45CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 46EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 47SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 48 49INITIALIZER_STATUS = 'uninitialized' 50 51 52def mul(x, y): 53 return x * y 54 55def capture(*args, **kwargs): 56 return args, kwargs 57 58def sleep_and_raise(t): 59 time.sleep(t) 60 raise Exception('this is an exception') 61 62def sleep_and_print(t, msg): 63 time.sleep(t) 64 print(msg) 65 sys.stdout.flush() 66 67def init(x): 68 global INITIALIZER_STATUS 69 INITIALIZER_STATUS = x 70 71def get_init_status(): 72 return INITIALIZER_STATUS 73 74def init_fail(log_queue=None): 75 if log_queue is not None: 76 logger = logging.getLogger('concurrent.futures') 77 logger.addHandler(QueueHandler(log_queue)) 78 logger.setLevel('CRITICAL') 79 logger.propagate = False 80 time.sleep(0.1) # let some futures be scheduled 81 raise ValueError('error in initializer') 82 83 84class MyObject(object): 85 def my_method(self): 86 pass 87 88 89class EventfulGCObj(): 90 def __init__(self, ctx): 91 mgr = get_context(ctx).Manager() 92 self.event = mgr.Event() 93 94 def __del__(self): 95 self.event.set() 96 97 98def make_dummy_object(_): 99 return MyObject() 100 101 102class BaseTestCase(unittest.TestCase): 103 def setUp(self): 104 self._thread_key = test.support.threading_setup() 105 106 def tearDown(self): 107 test.support.reap_children() 108 test.support.threading_cleanup(*self._thread_key) 109 110 111class ExecutorMixin: 112 worker_count = 5 113 executor_kwargs = {} 114 115 def setUp(self): 116 super().setUp() 117 118 self.t1 = time.monotonic() 119 if hasattr(self, "ctx"): 120 self.executor = self.executor_type( 121 max_workers=self.worker_count, 122 mp_context=self.get_context(), 123 **self.executor_kwargs) 124 else: 125 self.executor = self.executor_type( 126 max_workers=self.worker_count, 127 **self.executor_kwargs) 128 self._prime_executor() 129 130 def tearDown(self): 131 self.executor.shutdown(wait=True) 132 self.executor = None 133 134 dt = time.monotonic() - self.t1 135 if test.support.verbose: 136 print("%.2fs" % dt, end=' ') 137 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 138 139 super().tearDown() 140 141 def get_context(self): 142 return get_context(self.ctx) 143 144 def _prime_executor(self): 145 # Make sure that the executor is ready to do work before running the 146 # tests. This should reduce the probability of timeouts in the tests. 147 futures = [self.executor.submit(time.sleep, 0.1) 148 for _ in range(self.worker_count)] 149 for f in futures: 150 f.result() 151 152 153class ThreadPoolMixin(ExecutorMixin): 154 executor_type = futures.ThreadPoolExecutor 155 156 157class ProcessPoolForkMixin(ExecutorMixin): 158 executor_type = futures.ProcessPoolExecutor 159 ctx = "fork" 160 161 def get_context(self): 162 if sys.platform == "win32": 163 self.skipTest("require unix system") 164 return super().get_context() 165 166 167class ProcessPoolSpawnMixin(ExecutorMixin): 168 executor_type = futures.ProcessPoolExecutor 169 ctx = "spawn" 170 171 172class ProcessPoolForkserverMixin(ExecutorMixin): 173 executor_type = futures.ProcessPoolExecutor 174 ctx = "forkserver" 175 176 def get_context(self): 177 if sys.platform == "win32": 178 self.skipTest("require unix system") 179 return super().get_context() 180 181 182def create_executor_tests(mixin, bases=(BaseTestCase,), 183 executor_mixins=(ThreadPoolMixin, 184 ProcessPoolForkMixin, 185 ProcessPoolForkserverMixin, 186 ProcessPoolSpawnMixin)): 187 def strip_mixin(name): 188 if name.endswith(('Mixin', 'Tests')): 189 return name[:-5] 190 elif name.endswith('Test'): 191 return name[:-4] 192 else: 193 return name 194 195 for exe in executor_mixins: 196 name = ("%s%sTest" 197 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 198 cls = type(name, (mixin,) + (exe,) + bases, {}) 199 globals()[name] = cls 200 201 202class InitializerMixin(ExecutorMixin): 203 worker_count = 2 204 205 def setUp(self): 206 global INITIALIZER_STATUS 207 INITIALIZER_STATUS = 'uninitialized' 208 self.executor_kwargs = dict(initializer=init, 209 initargs=('initialized',)) 210 super().setUp() 211 212 def test_initializer(self): 213 futures = [self.executor.submit(get_init_status) 214 for _ in range(self.worker_count)] 215 216 for f in futures: 217 self.assertEqual(f.result(), 'initialized') 218 219 220class FailingInitializerMixin(ExecutorMixin): 221 worker_count = 2 222 223 def setUp(self): 224 if hasattr(self, "ctx"): 225 # Pass a queue to redirect the child's logging output 226 self.mp_context = self.get_context() 227 self.log_queue = self.mp_context.Queue() 228 self.executor_kwargs = dict(initializer=init_fail, 229 initargs=(self.log_queue,)) 230 else: 231 # In a thread pool, the child shares our logging setup 232 # (see _assert_logged()) 233 self.mp_context = None 234 self.log_queue = None 235 self.executor_kwargs = dict(initializer=init_fail) 236 super().setUp() 237 238 def test_initializer(self): 239 with self._assert_logged('ValueError: error in initializer'): 240 try: 241 future = self.executor.submit(get_init_status) 242 except BrokenExecutor: 243 # Perhaps the executor is already broken 244 pass 245 else: 246 with self.assertRaises(BrokenExecutor): 247 future.result() 248 # At some point, the executor should break 249 t1 = time.monotonic() 250 while not self.executor._broken: 251 if time.monotonic() - t1 > 5: 252 self.fail("executor not broken after 5 s.") 253 time.sleep(0.01) 254 # ... and from this point submit() is guaranteed to fail 255 with self.assertRaises(BrokenExecutor): 256 self.executor.submit(get_init_status) 257 258 def _prime_executor(self): 259 pass 260 261 @contextlib.contextmanager 262 def _assert_logged(self, msg): 263 if self.log_queue is not None: 264 yield 265 output = [] 266 try: 267 while True: 268 output.append(self.log_queue.get_nowait().getMessage()) 269 except queue.Empty: 270 pass 271 else: 272 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 273 yield 274 output = cm.output 275 self.assertTrue(any(msg in line for line in output), 276 output) 277 278 279create_executor_tests(InitializerMixin) 280create_executor_tests(FailingInitializerMixin) 281 282 283class ExecutorShutdownTest: 284 def test_run_after_shutdown(self): 285 self.executor.shutdown() 286 self.assertRaises(RuntimeError, 287 self.executor.submit, 288 pow, 2, 5) 289 290 def test_interpreter_shutdown(self): 291 # Test the atexit hook for shutdown of worker threads and processes 292 rc, out, err = assert_python_ok('-c', """if 1: 293 from concurrent.futures import {executor_type} 294 from time import sleep 295 from test.test_concurrent_futures import sleep_and_print 296 if __name__ == "__main__": 297 context = '{context}' 298 if context == "": 299 t = {executor_type}(5) 300 else: 301 from multiprocessing import get_context 302 context = get_context(context) 303 t = {executor_type}(5, mp_context=context) 304 t.submit(sleep_and_print, 1.0, "apple") 305 """.format(executor_type=self.executor_type.__name__, 306 context=getattr(self, "ctx", ""))) 307 # Errors in atexit hooks don't change the process exit code, check 308 # stderr manually. 309 self.assertFalse(err) 310 self.assertEqual(out.strip(), b"apple") 311 312 def test_submit_after_interpreter_shutdown(self): 313 # Test the atexit hook for shutdown of worker threads and processes 314 rc, out, err = assert_python_ok('-c', """if 1: 315 import atexit 316 @atexit.register 317 def run_last(): 318 try: 319 t.submit(id, None) 320 except RuntimeError: 321 print("runtime-error") 322 raise 323 from concurrent.futures import {executor_type} 324 if __name__ == "__main__": 325 context = '{context}' 326 if not context: 327 t = {executor_type}(5) 328 else: 329 from multiprocessing import get_context 330 context = get_context(context) 331 t = {executor_type}(5, mp_context=context) 332 t.submit(id, 42).result() 333 """.format(executor_type=self.executor_type.__name__, 334 context=getattr(self, "ctx", ""))) 335 # Errors in atexit hooks don't change the process exit code, check 336 # stderr manually. 337 self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) 338 self.assertEqual(out.strip(), b"runtime-error") 339 340 def test_hang_issue12364(self): 341 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 342 self.executor.shutdown() 343 for f in fs: 344 f.result() 345 346 347class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): 348 def _prime_executor(self): 349 pass 350 351 def test_threads_terminate(self): 352 def acquire_lock(lock): 353 lock.acquire() 354 355 sem = threading.Semaphore(0) 356 for i in range(3): 357 self.executor.submit(acquire_lock, sem) 358 self.assertEqual(len(self.executor._threads), 3) 359 for i in range(3): 360 sem.release() 361 self.executor.shutdown() 362 for t in self.executor._threads: 363 t.join() 364 365 def test_context_manager_shutdown(self): 366 with futures.ThreadPoolExecutor(max_workers=5) as e: 367 executor = e 368 self.assertEqual(list(e.map(abs, range(-5, 5))), 369 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 370 371 for t in executor._threads: 372 t.join() 373 374 def test_del_shutdown(self): 375 executor = futures.ThreadPoolExecutor(max_workers=5) 376 executor.map(abs, range(-5, 5)) 377 threads = executor._threads 378 del executor 379 380 for t in threads: 381 t.join() 382 383 def test_thread_names_assigned(self): 384 executor = futures.ThreadPoolExecutor( 385 max_workers=5, thread_name_prefix='SpecialPool') 386 executor.map(abs, range(-5, 5)) 387 threads = executor._threads 388 del executor 389 390 for t in threads: 391 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 392 t.join() 393 394 def test_thread_names_default(self): 395 executor = futures.ThreadPoolExecutor(max_workers=5) 396 executor.map(abs, range(-5, 5)) 397 threads = executor._threads 398 del executor 399 400 for t in threads: 401 # Ensure that our default name is reasonably sane and unique when 402 # no thread_name_prefix was supplied. 403 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 404 t.join() 405 406 407class ProcessPoolShutdownTest(ExecutorShutdownTest): 408 def _prime_executor(self): 409 pass 410 411 def test_processes_terminate(self): 412 self.executor.submit(mul, 21, 2) 413 self.executor.submit(mul, 6, 7) 414 self.executor.submit(mul, 3, 14) 415 self.assertEqual(len(self.executor._processes), 5) 416 processes = self.executor._processes 417 self.executor.shutdown() 418 419 for p in processes.values(): 420 p.join() 421 422 def test_context_manager_shutdown(self): 423 with futures.ProcessPoolExecutor(max_workers=5) as e: 424 processes = e._processes 425 self.assertEqual(list(e.map(abs, range(-5, 5))), 426 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 427 428 for p in processes.values(): 429 p.join() 430 431 def test_del_shutdown(self): 432 executor = futures.ProcessPoolExecutor(max_workers=5) 433 list(executor.map(abs, range(-5, 5))) 434 queue_management_thread = executor._queue_management_thread 435 processes = executor._processes 436 call_queue = executor._call_queue 437 queue_management_thread = executor._queue_management_thread 438 del executor 439 440 # Make sure that all the executor resources were properly cleaned by 441 # the shutdown process 442 queue_management_thread.join() 443 for p in processes.values(): 444 p.join() 445 call_queue.join_thread() 446 447 448create_executor_tests(ProcessPoolShutdownTest, 449 executor_mixins=(ProcessPoolForkMixin, 450 ProcessPoolForkserverMixin, 451 ProcessPoolSpawnMixin)) 452 453 454class WaitTests: 455 456 def test_first_completed(self): 457 future1 = self.executor.submit(mul, 21, 2) 458 future2 = self.executor.submit(time.sleep, 1.5) 459 460 done, not_done = futures.wait( 461 [CANCELLED_FUTURE, future1, future2], 462 return_when=futures.FIRST_COMPLETED) 463 464 self.assertEqual(set([future1]), done) 465 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 466 467 def test_first_completed_some_already_completed(self): 468 future1 = self.executor.submit(time.sleep, 1.5) 469 470 finished, pending = futures.wait( 471 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 472 return_when=futures.FIRST_COMPLETED) 473 474 self.assertEqual( 475 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 476 finished) 477 self.assertEqual(set([future1]), pending) 478 479 def test_first_exception(self): 480 future1 = self.executor.submit(mul, 2, 21) 481 future2 = self.executor.submit(sleep_and_raise, 1.5) 482 future3 = self.executor.submit(time.sleep, 3) 483 484 finished, pending = futures.wait( 485 [future1, future2, future3], 486 return_when=futures.FIRST_EXCEPTION) 487 488 self.assertEqual(set([future1, future2]), finished) 489 self.assertEqual(set([future3]), pending) 490 491 def test_first_exception_some_already_complete(self): 492 future1 = self.executor.submit(divmod, 21, 0) 493 future2 = self.executor.submit(time.sleep, 1.5) 494 495 finished, pending = futures.wait( 496 [SUCCESSFUL_FUTURE, 497 CANCELLED_FUTURE, 498 CANCELLED_AND_NOTIFIED_FUTURE, 499 future1, future2], 500 return_when=futures.FIRST_EXCEPTION) 501 502 self.assertEqual(set([SUCCESSFUL_FUTURE, 503 CANCELLED_AND_NOTIFIED_FUTURE, 504 future1]), finished) 505 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 506 507 def test_first_exception_one_already_failed(self): 508 future1 = self.executor.submit(time.sleep, 2) 509 510 finished, pending = futures.wait( 511 [EXCEPTION_FUTURE, future1], 512 return_when=futures.FIRST_EXCEPTION) 513 514 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 515 self.assertEqual(set([future1]), pending) 516 517 def test_all_completed(self): 518 future1 = self.executor.submit(divmod, 2, 0) 519 future2 = self.executor.submit(mul, 2, 21) 520 521 finished, pending = futures.wait( 522 [SUCCESSFUL_FUTURE, 523 CANCELLED_AND_NOTIFIED_FUTURE, 524 EXCEPTION_FUTURE, 525 future1, 526 future2], 527 return_when=futures.ALL_COMPLETED) 528 529 self.assertEqual(set([SUCCESSFUL_FUTURE, 530 CANCELLED_AND_NOTIFIED_FUTURE, 531 EXCEPTION_FUTURE, 532 future1, 533 future2]), finished) 534 self.assertEqual(set(), pending) 535 536 def test_timeout(self): 537 future1 = self.executor.submit(mul, 6, 7) 538 future2 = self.executor.submit(time.sleep, 6) 539 540 finished, pending = futures.wait( 541 [CANCELLED_AND_NOTIFIED_FUTURE, 542 EXCEPTION_FUTURE, 543 SUCCESSFUL_FUTURE, 544 future1, future2], 545 timeout=5, 546 return_when=futures.ALL_COMPLETED) 547 548 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 549 EXCEPTION_FUTURE, 550 SUCCESSFUL_FUTURE, 551 future1]), finished) 552 self.assertEqual(set([future2]), pending) 553 554 555class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): 556 557 def test_pending_calls_race(self): 558 # Issue #14406: multi-threaded race condition when waiting on all 559 # futures. 560 event = threading.Event() 561 def future_func(): 562 event.wait() 563 oldswitchinterval = sys.getswitchinterval() 564 sys.setswitchinterval(1e-6) 565 try: 566 fs = {self.executor.submit(future_func) for i in range(100)} 567 event.set() 568 futures.wait(fs, return_when=futures.ALL_COMPLETED) 569 finally: 570 sys.setswitchinterval(oldswitchinterval) 571 572 573create_executor_tests(WaitTests, 574 executor_mixins=(ProcessPoolForkMixin, 575 ProcessPoolForkserverMixin, 576 ProcessPoolSpawnMixin)) 577 578 579class AsCompletedTests: 580 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 581 def test_no_timeout(self): 582 future1 = self.executor.submit(mul, 2, 21) 583 future2 = self.executor.submit(mul, 7, 6) 584 585 completed = set(futures.as_completed( 586 [CANCELLED_AND_NOTIFIED_FUTURE, 587 EXCEPTION_FUTURE, 588 SUCCESSFUL_FUTURE, 589 future1, future2])) 590 self.assertEqual(set( 591 [CANCELLED_AND_NOTIFIED_FUTURE, 592 EXCEPTION_FUTURE, 593 SUCCESSFUL_FUTURE, 594 future1, future2]), 595 completed) 596 597 def test_zero_timeout(self): 598 future1 = self.executor.submit(time.sleep, 2) 599 completed_futures = set() 600 try: 601 for future in futures.as_completed( 602 [CANCELLED_AND_NOTIFIED_FUTURE, 603 EXCEPTION_FUTURE, 604 SUCCESSFUL_FUTURE, 605 future1], 606 timeout=0): 607 completed_futures.add(future) 608 except futures.TimeoutError: 609 pass 610 611 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 612 EXCEPTION_FUTURE, 613 SUCCESSFUL_FUTURE]), 614 completed_futures) 615 616 def test_duplicate_futures(self): 617 # Issue 20367. Duplicate futures should not raise exceptions or give 618 # duplicate responses. 619 # Issue #31641: accept arbitrary iterables. 620 future1 = self.executor.submit(time.sleep, 2) 621 completed = [ 622 f for f in futures.as_completed(itertools.repeat(future1, 3)) 623 ] 624 self.assertEqual(len(completed), 1) 625 626 def test_free_reference_yielded_future(self): 627 # Issue #14406: Generator should not keep references 628 # to finished futures. 629 futures_list = [Future() for _ in range(8)] 630 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 631 futures_list.append(create_future(state=FINISHED, result=42)) 632 633 with self.assertRaises(futures.TimeoutError): 634 for future in futures.as_completed(futures_list, timeout=0): 635 futures_list.remove(future) 636 wr = weakref.ref(future) 637 del future 638 self.assertIsNone(wr()) 639 640 futures_list[0].set_result("test") 641 for future in futures.as_completed(futures_list): 642 futures_list.remove(future) 643 wr = weakref.ref(future) 644 del future 645 self.assertIsNone(wr()) 646 if futures_list: 647 futures_list[0].set_result("test") 648 649 def test_correct_timeout_exception_msg(self): 650 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 651 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 652 653 with self.assertRaises(futures.TimeoutError) as cm: 654 list(futures.as_completed(futures_list, timeout=0)) 655 656 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 657 658 659create_executor_tests(AsCompletedTests) 660 661 662class ExecutorTest: 663 # Executor.shutdown() and context manager usage is tested by 664 # ExecutorShutdownTest. 665 def test_submit(self): 666 future = self.executor.submit(pow, 2, 8) 667 self.assertEqual(256, future.result()) 668 669 def test_submit_keyword(self): 670 future = self.executor.submit(mul, 2, y=8) 671 self.assertEqual(16, future.result()) 672 future = self.executor.submit(capture, 1, self=2, fn=3) 673 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) 674 with self.assertWarns(DeprecationWarning): 675 future = self.executor.submit(fn=capture, arg=1) 676 self.assertEqual(future.result(), ((), {'arg': 1})) 677 with self.assertRaises(TypeError): 678 self.executor.submit(arg=1) 679 680 def test_map(self): 681 self.assertEqual( 682 list(self.executor.map(pow, range(10), range(10))), 683 list(map(pow, range(10), range(10)))) 684 685 self.assertEqual( 686 list(self.executor.map(pow, range(10), range(10), chunksize=3)), 687 list(map(pow, range(10), range(10)))) 688 689 def test_map_exception(self): 690 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 691 self.assertEqual(i.__next__(), (0, 1)) 692 self.assertEqual(i.__next__(), (0, 1)) 693 self.assertRaises(ZeroDivisionError, i.__next__) 694 695 def test_map_timeout(self): 696 results = [] 697 try: 698 for i in self.executor.map(time.sleep, 699 [0, 0, 6], 700 timeout=5): 701 results.append(i) 702 except futures.TimeoutError: 703 pass 704 else: 705 self.fail('expected TimeoutError') 706 707 self.assertEqual([None, None], results) 708 709 def test_shutdown_race_issue12456(self): 710 # Issue #12456: race condition at shutdown where trying to post a 711 # sentinel in the call queue blocks (the queue is full while processes 712 # have exited). 713 self.executor.map(str, [2] * (self.worker_count + 1)) 714 self.executor.shutdown() 715 716 @test.support.cpython_only 717 def test_no_stale_references(self): 718 # Issue #16284: check that the executors don't unnecessarily hang onto 719 # references. 720 my_object = MyObject() 721 my_object_collected = threading.Event() 722 my_object_callback = weakref.ref( 723 my_object, lambda obj: my_object_collected.set()) 724 # Deliberately discarding the future. 725 self.executor.submit(my_object.my_method) 726 del my_object 727 728 collected = my_object_collected.wait(timeout=5.0) 729 self.assertTrue(collected, 730 "Stale reference not collected within timeout.") 731 732 def test_max_workers_negative(self): 733 for number in (0, -1): 734 with self.assertRaisesRegex(ValueError, 735 "max_workers must be greater " 736 "than 0"): 737 self.executor_type(max_workers=number) 738 739 def test_free_reference(self): 740 # Issue #14406: Result iterator should not keep an internal 741 # reference to result objects. 742 for obj in self.executor.map(make_dummy_object, range(10)): 743 wr = weakref.ref(obj) 744 del obj 745 self.assertIsNone(wr()) 746 747 748class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): 749 def test_map_submits_without_iteration(self): 750 """Tests verifying issue 11777.""" 751 finished = [] 752 def record_finished(n): 753 finished.append(n) 754 755 self.executor.map(record_finished, range(10)) 756 self.executor.shutdown(wait=True) 757 self.assertCountEqual(finished, range(10)) 758 759 def test_default_workers(self): 760 executor = self.executor_type() 761 expected = min(32, (os.cpu_count() or 1) + 4) 762 self.assertEqual(executor._max_workers, expected) 763 764 def test_saturation(self): 765 executor = self.executor_type(4) 766 def acquire_lock(lock): 767 lock.acquire() 768 769 sem = threading.Semaphore(0) 770 for i in range(15 * executor._max_workers): 771 executor.submit(acquire_lock, sem) 772 self.assertEqual(len(executor._threads), executor._max_workers) 773 for i in range(15 * executor._max_workers): 774 sem.release() 775 executor.shutdown(wait=True) 776 777 def test_idle_thread_reuse(self): 778 executor = self.executor_type() 779 executor.submit(mul, 21, 2).result() 780 executor.submit(mul, 6, 7).result() 781 executor.submit(mul, 3, 14).result() 782 self.assertEqual(len(executor._threads), 1) 783 executor.shutdown(wait=True) 784 785 786class ProcessPoolExecutorTest(ExecutorTest): 787 788 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') 789 def test_max_workers_too_large(self): 790 with self.assertRaisesRegex(ValueError, 791 "max_workers must be <= 61"): 792 futures.ProcessPoolExecutor(max_workers=62) 793 794 def test_killed_child(self): 795 # When a child process is abruptly terminated, the whole pool gets 796 # "broken". 797 futures = [self.executor.submit(time.sleep, 3)] 798 # Get one of the processes, and terminate (kill) it 799 p = next(iter(self.executor._processes.values())) 800 p.terminate() 801 for fut in futures: 802 self.assertRaises(BrokenProcessPool, fut.result) 803 # Submitting other jobs fails as well. 804 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 805 806 def test_map_chunksize(self): 807 def bad_map(): 808 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 809 810 ref = list(map(pow, range(40), range(40))) 811 self.assertEqual( 812 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 813 ref) 814 self.assertEqual( 815 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 816 ref) 817 self.assertEqual( 818 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 819 ref) 820 self.assertRaises(ValueError, bad_map) 821 822 @classmethod 823 def _test_traceback(cls): 824 raise RuntimeError(123) # some comment 825 826 def test_traceback(self): 827 # We want ensure that the traceback from the child process is 828 # contained in the traceback raised in the main process. 829 future = self.executor.submit(self._test_traceback) 830 with self.assertRaises(Exception) as cm: 831 future.result() 832 833 exc = cm.exception 834 self.assertIs(type(exc), RuntimeError) 835 self.assertEqual(exc.args, (123,)) 836 cause = exc.__cause__ 837 self.assertIs(type(cause), futures.process._RemoteTraceback) 838 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 839 840 with test.support.captured_stderr() as f1: 841 try: 842 raise exc 843 except RuntimeError: 844 sys.excepthook(*sys.exc_info()) 845 self.assertIn('raise RuntimeError(123) # some comment', 846 f1.getvalue()) 847 848 def test_ressources_gced_in_workers(self): 849 # Ensure that argument for a job are correctly gc-ed after the job 850 # is finished 851 obj = EventfulGCObj(self.ctx) 852 future = self.executor.submit(id, obj) 853 future.result() 854 855 self.assertTrue(obj.event.wait(timeout=1)) 856 857 858create_executor_tests(ProcessPoolExecutorTest, 859 executor_mixins=(ProcessPoolForkMixin, 860 ProcessPoolForkserverMixin, 861 ProcessPoolSpawnMixin)) 862 863def hide_process_stderr(): 864 import io 865 sys.stderr = io.StringIO() 866 867 868def _crash(delay=None): 869 """Induces a segfault.""" 870 if delay: 871 time.sleep(delay) 872 import faulthandler 873 faulthandler.disable() 874 faulthandler._sigsegv() 875 876 877def _exit(): 878 """Induces a sys exit with exitcode 1.""" 879 sys.exit(1) 880 881 882def _raise_error(Err): 883 """Function that raises an Exception in process.""" 884 hide_process_stderr() 885 raise Err() 886 887 888def _return_instance(cls): 889 """Function that returns a instance of cls.""" 890 hide_process_stderr() 891 return cls() 892 893 894class CrashAtPickle(object): 895 """Bad object that triggers a segfault at pickling time.""" 896 def __reduce__(self): 897 _crash() 898 899 900class CrashAtUnpickle(object): 901 """Bad object that triggers a segfault at unpickling time.""" 902 def __reduce__(self): 903 return _crash, () 904 905 906class ExitAtPickle(object): 907 """Bad object that triggers a process exit at pickling time.""" 908 def __reduce__(self): 909 _exit() 910 911 912class ExitAtUnpickle(object): 913 """Bad object that triggers a process exit at unpickling time.""" 914 def __reduce__(self): 915 return _exit, () 916 917 918class ErrorAtPickle(object): 919 """Bad object that triggers an error at pickling time.""" 920 def __reduce__(self): 921 from pickle import PicklingError 922 raise PicklingError("Error in pickle") 923 924 925class ErrorAtUnpickle(object): 926 """Bad object that triggers an error at unpickling time.""" 927 def __reduce__(self): 928 from pickle import UnpicklingError 929 return _raise_error, (UnpicklingError, ) 930 931 932class ExecutorDeadlockTest: 933 TIMEOUT = 15 934 935 @classmethod 936 def _sleep_id(cls, x, delay): 937 time.sleep(delay) 938 return x 939 940 def _fail_on_deadlock(self, executor): 941 # If we did not recover before TIMEOUT seconds, consider that the 942 # executor is in a deadlock state and forcefully clean all its 943 # composants. 944 import faulthandler 945 from tempfile import TemporaryFile 946 with TemporaryFile(mode="w+") as f: 947 faulthandler.dump_traceback(file=f) 948 f.seek(0) 949 tb = f.read() 950 for p in executor._processes.values(): 951 p.terminate() 952 # This should be safe to call executor.shutdown here as all possible 953 # deadlocks should have been broken. 954 executor.shutdown(wait=True) 955 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 956 self.fail(f"Executor deadlock:\n\n{tb}") 957 958 959 def test_crash(self): 960 # extensive testing for deadlock caused by crashes in a pool. 961 self.executor.shutdown(wait=True) 962 crash_cases = [ 963 # Check problem occurring while pickling a task in 964 # the task_handler thread 965 (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), 966 # Check problem occurring while unpickling a task on workers 967 (id, (ExitAtUnpickle(),), BrokenProcessPool, 968 "exit at task unpickle"), 969 (id, (ErrorAtUnpickle(),), BrokenProcessPool, 970 "error at task unpickle"), 971 (id, (CrashAtUnpickle(),), BrokenProcessPool, 972 "crash at task unpickle"), 973 # Check problem occurring during func execution on workers 974 (_crash, (), BrokenProcessPool, 975 "crash during func execution on worker"), 976 (_exit, (), SystemExit, 977 "exit during func execution on worker"), 978 (_raise_error, (RuntimeError, ), RuntimeError, 979 "error during func execution on worker"), 980 # Check problem occurring while pickling a task result 981 # on workers 982 (_return_instance, (CrashAtPickle,), BrokenProcessPool, 983 "crash during result pickle on worker"), 984 (_return_instance, (ExitAtPickle,), SystemExit, 985 "exit during result pickle on worker"), 986 (_return_instance, (ErrorAtPickle,), PicklingError, 987 "error during result pickle on worker"), 988 # Check problem occurring while unpickling a task in 989 # the result_handler thread 990 (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, 991 "error during result unpickle in result_handler"), 992 (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, 993 "exit during result unpickle in result_handler") 994 ] 995 for func, args, error, name in crash_cases: 996 with self.subTest(name): 997 # The captured_stderr reduces the noise in the test report 998 with test.support.captured_stderr(): 999 executor = self.executor_type( 1000 max_workers=2, mp_context=get_context(self.ctx)) 1001 res = executor.submit(func, *args) 1002 with self.assertRaises(error): 1003 try: 1004 res.result(timeout=self.TIMEOUT) 1005 except futures.TimeoutError: 1006 # If we did not recover before TIMEOUT seconds, 1007 # consider that the executor is in a deadlock state 1008 self._fail_on_deadlock(executor) 1009 executor.shutdown(wait=True) 1010 1011 def test_shutdown_deadlock(self): 1012 # Test that the pool calling shutdown do not cause deadlock 1013 # if a worker fails after the shutdown call. 1014 self.executor.shutdown(wait=True) 1015 with self.executor_type(max_workers=2, 1016 mp_context=get_context(self.ctx)) as executor: 1017 self.executor = executor # Allow clean up in fail_on_deadlock 1018 f = executor.submit(_crash, delay=.1) 1019 executor.shutdown(wait=True) 1020 with self.assertRaises(BrokenProcessPool): 1021 f.result() 1022 1023 1024create_executor_tests(ExecutorDeadlockTest, 1025 executor_mixins=(ProcessPoolForkMixin, 1026 ProcessPoolForkserverMixin, 1027 ProcessPoolSpawnMixin)) 1028 1029 1030class FutureTests(BaseTestCase): 1031 def test_done_callback_with_result(self): 1032 callback_result = None 1033 def fn(callback_future): 1034 nonlocal callback_result 1035 callback_result = callback_future.result() 1036 1037 f = Future() 1038 f.add_done_callback(fn) 1039 f.set_result(5) 1040 self.assertEqual(5, callback_result) 1041 1042 def test_done_callback_with_exception(self): 1043 callback_exception = None 1044 def fn(callback_future): 1045 nonlocal callback_exception 1046 callback_exception = callback_future.exception() 1047 1048 f = Future() 1049 f.add_done_callback(fn) 1050 f.set_exception(Exception('test')) 1051 self.assertEqual(('test',), callback_exception.args) 1052 1053 def test_done_callback_with_cancel(self): 1054 was_cancelled = None 1055 def fn(callback_future): 1056 nonlocal was_cancelled 1057 was_cancelled = callback_future.cancelled() 1058 1059 f = Future() 1060 f.add_done_callback(fn) 1061 self.assertTrue(f.cancel()) 1062 self.assertTrue(was_cancelled) 1063 1064 def test_done_callback_raises(self): 1065 with test.support.captured_stderr() as stderr: 1066 raising_was_called = False 1067 fn_was_called = False 1068 1069 def raising_fn(callback_future): 1070 nonlocal raising_was_called 1071 raising_was_called = True 1072 raise Exception('doh!') 1073 1074 def fn(callback_future): 1075 nonlocal fn_was_called 1076 fn_was_called = True 1077 1078 f = Future() 1079 f.add_done_callback(raising_fn) 1080 f.add_done_callback(fn) 1081 f.set_result(5) 1082 self.assertTrue(raising_was_called) 1083 self.assertTrue(fn_was_called) 1084 self.assertIn('Exception: doh!', stderr.getvalue()) 1085 1086 def test_done_callback_already_successful(self): 1087 callback_result = None 1088 def fn(callback_future): 1089 nonlocal callback_result 1090 callback_result = callback_future.result() 1091 1092 f = Future() 1093 f.set_result(5) 1094 f.add_done_callback(fn) 1095 self.assertEqual(5, callback_result) 1096 1097 def test_done_callback_already_failed(self): 1098 callback_exception = None 1099 def fn(callback_future): 1100 nonlocal callback_exception 1101 callback_exception = callback_future.exception() 1102 1103 f = Future() 1104 f.set_exception(Exception('test')) 1105 f.add_done_callback(fn) 1106 self.assertEqual(('test',), callback_exception.args) 1107 1108 def test_done_callback_already_cancelled(self): 1109 was_cancelled = None 1110 def fn(callback_future): 1111 nonlocal was_cancelled 1112 was_cancelled = callback_future.cancelled() 1113 1114 f = Future() 1115 self.assertTrue(f.cancel()) 1116 f.add_done_callback(fn) 1117 self.assertTrue(was_cancelled) 1118 1119 def test_done_callback_raises_already_succeeded(self): 1120 with test.support.captured_stderr() as stderr: 1121 def raising_fn(callback_future): 1122 raise Exception('doh!') 1123 1124 f = Future() 1125 1126 # Set the result first to simulate a future that runs instantly, 1127 # effectively allowing the callback to be run immediately. 1128 f.set_result(5) 1129 f.add_done_callback(raising_fn) 1130 1131 self.assertIn('exception calling callback for', stderr.getvalue()) 1132 self.assertIn('doh!', stderr.getvalue()) 1133 1134 1135 def test_repr(self): 1136 self.assertRegex(repr(PENDING_FUTURE), 1137 '<Future at 0x[0-9a-f]+ state=pending>') 1138 self.assertRegex(repr(RUNNING_FUTURE), 1139 '<Future at 0x[0-9a-f]+ state=running>') 1140 self.assertRegex(repr(CANCELLED_FUTURE), 1141 '<Future at 0x[0-9a-f]+ state=cancelled>') 1142 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 1143 '<Future at 0x[0-9a-f]+ state=cancelled>') 1144 self.assertRegex( 1145 repr(EXCEPTION_FUTURE), 1146 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 1147 self.assertRegex( 1148 repr(SUCCESSFUL_FUTURE), 1149 '<Future at 0x[0-9a-f]+ state=finished returned int>') 1150 1151 1152 def test_cancel(self): 1153 f1 = create_future(state=PENDING) 1154 f2 = create_future(state=RUNNING) 1155 f3 = create_future(state=CANCELLED) 1156 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 1157 f5 = create_future(state=FINISHED, exception=OSError()) 1158 f6 = create_future(state=FINISHED, result=5) 1159 1160 self.assertTrue(f1.cancel()) 1161 self.assertEqual(f1._state, CANCELLED) 1162 1163 self.assertFalse(f2.cancel()) 1164 self.assertEqual(f2._state, RUNNING) 1165 1166 self.assertTrue(f3.cancel()) 1167 self.assertEqual(f3._state, CANCELLED) 1168 1169 self.assertTrue(f4.cancel()) 1170 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 1171 1172 self.assertFalse(f5.cancel()) 1173 self.assertEqual(f5._state, FINISHED) 1174 1175 self.assertFalse(f6.cancel()) 1176 self.assertEqual(f6._state, FINISHED) 1177 1178 def test_cancelled(self): 1179 self.assertFalse(PENDING_FUTURE.cancelled()) 1180 self.assertFalse(RUNNING_FUTURE.cancelled()) 1181 self.assertTrue(CANCELLED_FUTURE.cancelled()) 1182 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 1183 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 1184 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 1185 1186 def test_done(self): 1187 self.assertFalse(PENDING_FUTURE.done()) 1188 self.assertFalse(RUNNING_FUTURE.done()) 1189 self.assertTrue(CANCELLED_FUTURE.done()) 1190 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 1191 self.assertTrue(EXCEPTION_FUTURE.done()) 1192 self.assertTrue(SUCCESSFUL_FUTURE.done()) 1193 1194 def test_running(self): 1195 self.assertFalse(PENDING_FUTURE.running()) 1196 self.assertTrue(RUNNING_FUTURE.running()) 1197 self.assertFalse(CANCELLED_FUTURE.running()) 1198 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 1199 self.assertFalse(EXCEPTION_FUTURE.running()) 1200 self.assertFalse(SUCCESSFUL_FUTURE.running()) 1201 1202 def test_result_with_timeout(self): 1203 self.assertRaises(futures.TimeoutError, 1204 PENDING_FUTURE.result, timeout=0) 1205 self.assertRaises(futures.TimeoutError, 1206 RUNNING_FUTURE.result, timeout=0) 1207 self.assertRaises(futures.CancelledError, 1208 CANCELLED_FUTURE.result, timeout=0) 1209 self.assertRaises(futures.CancelledError, 1210 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 1211 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 1212 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 1213 1214 def test_result_with_success(self): 1215 # TODO(brian@sweetapp.com): This test is timing dependent. 1216 def notification(): 1217 # Wait until the main thread is waiting for the result. 1218 time.sleep(1) 1219 f1.set_result(42) 1220 1221 f1 = create_future(state=PENDING) 1222 t = threading.Thread(target=notification) 1223 t.start() 1224 1225 self.assertEqual(f1.result(timeout=5), 42) 1226 t.join() 1227 1228 def test_result_with_cancel(self): 1229 # TODO(brian@sweetapp.com): This test is timing dependent. 1230 def notification(): 1231 # Wait until the main thread is waiting for the result. 1232 time.sleep(1) 1233 f1.cancel() 1234 1235 f1 = create_future(state=PENDING) 1236 t = threading.Thread(target=notification) 1237 t.start() 1238 1239 self.assertRaises(futures.CancelledError, f1.result, timeout=5) 1240 t.join() 1241 1242 def test_exception_with_timeout(self): 1243 self.assertRaises(futures.TimeoutError, 1244 PENDING_FUTURE.exception, timeout=0) 1245 self.assertRaises(futures.TimeoutError, 1246 RUNNING_FUTURE.exception, timeout=0) 1247 self.assertRaises(futures.CancelledError, 1248 CANCELLED_FUTURE.exception, timeout=0) 1249 self.assertRaises(futures.CancelledError, 1250 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 1251 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 1252 OSError)) 1253 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 1254 1255 def test_exception_with_success(self): 1256 def notification(): 1257 # Wait until the main thread is waiting for the exception. 1258 time.sleep(1) 1259 with f1._condition: 1260 f1._state = FINISHED 1261 f1._exception = OSError() 1262 f1._condition.notify_all() 1263 1264 f1 = create_future(state=PENDING) 1265 t = threading.Thread(target=notification) 1266 t.start() 1267 1268 self.assertTrue(isinstance(f1.exception(timeout=5), OSError)) 1269 t.join() 1270 1271 def test_multiple_set_result(self): 1272 f = create_future(state=PENDING) 1273 f.set_result(1) 1274 1275 with self.assertRaisesRegex( 1276 futures.InvalidStateError, 1277 'FINISHED: <Future at 0x[0-9a-f]+ ' 1278 'state=finished returned int>' 1279 ): 1280 f.set_result(2) 1281 1282 self.assertTrue(f.done()) 1283 self.assertEqual(f.result(), 1) 1284 1285 def test_multiple_set_exception(self): 1286 f = create_future(state=PENDING) 1287 e = ValueError() 1288 f.set_exception(e) 1289 1290 with self.assertRaisesRegex( 1291 futures.InvalidStateError, 1292 'FINISHED: <Future at 0x[0-9a-f]+ ' 1293 'state=finished raised ValueError>' 1294 ): 1295 f.set_exception(Exception()) 1296 1297 self.assertEqual(f.exception(), e) 1298 1299 1300_threads_key = None 1301 1302def setUpModule(): 1303 global _threads_key 1304 _threads_key = test.support.threading_setup() 1305 1306 1307def tearDownModule(): 1308 test.support.threading_cleanup(*_threads_key) 1309 multiprocessing.util._cleanup_tests() 1310 1311 1312if __name__ == "__main__": 1313 unittest.main() 1314