1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import textwrap 9import time 10import io 11import itertools 12import sys 13import os 14import gc 15import importlib 16import errno 17import functools 18import signal 19import array 20import socket 21import random 22import logging 23import shutil 24import subprocess 25import struct 26import tempfile 27import operator 28import pickle 29import weakref 30import warnings 31import test.support 32import test.support.script_helper 33from test import support 34from test.support import hashlib_helper 35from test.support import import_helper 36from test.support import os_helper 37from test.support import script_helper 38from test.support import socket_helper 39from test.support import threading_helper 40from test.support import warnings_helper 41 42 43# Skip tests if _multiprocessing wasn't built. 44_multiprocessing = import_helper.import_module('_multiprocessing') 45# Skip tests if sem_open implementation is broken. 46support.skip_if_broken_multiprocessing_synchronize() 47import threading 48 49import multiprocessing.connection 50import multiprocessing.dummy 51import multiprocessing.heap 52import multiprocessing.managers 53import multiprocessing.pool 54import multiprocessing.queues 55from multiprocessing.connection import wait 56 57from multiprocessing import util 58 59try: 60 from multiprocessing import reduction 61 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 62except ImportError: 63 HAS_REDUCTION = False 64 65try: 66 from multiprocessing.sharedctypes import Value, copy 67 HAS_SHAREDCTYPES = True 68except ImportError: 69 HAS_SHAREDCTYPES = False 70 71try: 72 from multiprocessing import shared_memory 73 HAS_SHMEM = True 74except ImportError: 75 HAS_SHMEM = False 76 77try: 78 import msvcrt 79except ImportError: 80 msvcrt = None 81 82 83if support.HAVE_ASAN_FORK_BUG: 84 # gh-89363: Skip multiprocessing tests if Python is built with ASAN to 85 # work around a libasan race condition: dead lock in pthread_create(). 86 raise unittest.SkipTest("libasan has a pthread_create() dead lock related to thread+fork") 87 88 89# gh-110666: Tolerate a difference of 100 ms when comparing timings 90# (clock resolution) 91CLOCK_RES = 0.100 92 93 94def latin(s): 95 return s.encode('latin') 96 97 98def close_queue(queue): 99 if isinstance(queue, multiprocessing.queues.Queue): 100 queue.close() 101 queue.join_thread() 102 103 104def join_process(process): 105 # Since multiprocessing.Process has the same API than threading.Thread 106 # (join() and is_alive(), the support function can be reused 107 threading_helper.join_thread(process) 108 109 110if os.name == "posix": 111 from multiprocessing import resource_tracker 112 113 def _resource_unlink(name, rtype): 114 resource_tracker._CLEANUP_FUNCS[rtype](name) 115 116 117# 118# Constants 119# 120 121LOG_LEVEL = util.SUBWARNING 122#LOG_LEVEL = logging.DEBUG 123 124DELTA = 0.1 125CHECK_TIMINGS = False # making true makes tests take a lot longer 126 # and can sometimes cause some non-serious 127 # failures because some calls block a bit 128 # longer than expected 129if CHECK_TIMINGS: 130 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 131else: 132 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 133 134# BaseManager.shutdown_timeout 135SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT 136 137WAIT_ACTIVE_CHILDREN_TIMEOUT = 5.0 138 139HAVE_GETVALUE = not getattr(_multiprocessing, 140 'HAVE_BROKEN_SEM_GETVALUE', False) 141 142WIN32 = (sys.platform == "win32") 143 144def wait_for_handle(handle, timeout): 145 if timeout is not None and timeout < 0.0: 146 timeout = None 147 return wait([handle], timeout) 148 149try: 150 MAXFD = os.sysconf("SC_OPEN_MAX") 151except: 152 MAXFD = 256 153 154# To speed up tests when using the forkserver, we can preload these: 155PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 156 157# 158# Some tests require ctypes 159# 160 161try: 162 from ctypes import Structure, c_int, c_double, c_longlong 163except ImportError: 164 Structure = object 165 c_int = c_double = c_longlong = None 166 167 168def check_enough_semaphores(): 169 """Check that the system supports enough semaphores to run the test.""" 170 # minimum number of semaphores available according to POSIX 171 nsems_min = 256 172 try: 173 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 174 except (AttributeError, ValueError): 175 # sysconf not available or setting not available 176 return 177 if nsems == -1 or nsems >= nsems_min: 178 return 179 raise unittest.SkipTest("The OS doesn't support enough semaphores " 180 "to run the test (required: %d)." % nsems_min) 181 182 183def only_run_in_spawn_testsuite(reason): 184 """Returns a decorator: raises SkipTest when SM != spawn at test time. 185 186 This can be useful to save overall Python test suite execution time. 187 "spawn" is the universal mode available on all platforms so this limits the 188 decorated test to only execute within test_multiprocessing_spawn. 189 190 This would not be necessary if we refactored our test suite to split things 191 into other test files when they are not start method specific to be rerun 192 under all start methods. 193 """ 194 195 def decorator(test_item): 196 197 @functools.wraps(test_item) 198 def spawn_check_wrapper(*args, **kwargs): 199 if (start_method := multiprocessing.get_start_method()) != "spawn": 200 raise unittest.SkipTest(f"{start_method=}, not 'spawn'; {reason}") 201 return test_item(*args, **kwargs) 202 203 return spawn_check_wrapper 204 205 return decorator 206 207 208class TestInternalDecorators(unittest.TestCase): 209 """Logic within a test suite that could errantly skip tests? Test it!""" 210 211 @unittest.skipIf(sys.platform == "win32", "test requires that fork exists.") 212 def test_only_run_in_spawn_testsuite(self): 213 if multiprocessing.get_start_method() != "spawn": 214 raise unittest.SkipTest("only run in test_multiprocessing_spawn.") 215 216 try: 217 @only_run_in_spawn_testsuite("testing this decorator") 218 def return_four_if_spawn(): 219 return 4 220 except Exception as err: 221 self.fail(f"expected decorated `def` not to raise; caught {err}") 222 223 orig_start_method = multiprocessing.get_start_method(allow_none=True) 224 try: 225 multiprocessing.set_start_method("spawn", force=True) 226 self.assertEqual(return_four_if_spawn(), 4) 227 multiprocessing.set_start_method("fork", force=True) 228 with self.assertRaises(unittest.SkipTest) as ctx: 229 return_four_if_spawn() 230 self.assertIn("testing this decorator", str(ctx.exception)) 231 self.assertIn("start_method=", str(ctx.exception)) 232 finally: 233 multiprocessing.set_start_method(orig_start_method, force=True) 234 235 236# 237# Creates a wrapper for a function which records the time it takes to finish 238# 239 240class TimingWrapper(object): 241 242 def __init__(self, func): 243 self.func = func 244 self.elapsed = None 245 246 def __call__(self, *args, **kwds): 247 t = time.monotonic() 248 try: 249 return self.func(*args, **kwds) 250 finally: 251 self.elapsed = time.monotonic() - t 252 253# 254# Base class for test cases 255# 256 257class BaseTestCase(object): 258 259 ALLOWED_TYPES = ('processes', 'manager', 'threads') 260 # If not empty, limit which start method suites run this class. 261 START_METHODS: set[str] = set() 262 start_method = None # set by install_tests_in_module_dict() 263 264 def assertTimingAlmostEqual(self, a, b): 265 if CHECK_TIMINGS: 266 self.assertAlmostEqual(a, b, 1) 267 268 def assertReturnsIfImplemented(self, value, func, *args): 269 try: 270 res = func(*args) 271 except NotImplementedError: 272 pass 273 else: 274 return self.assertEqual(value, res) 275 276 # For the sanity of Windows users, rather than crashing or freezing in 277 # multiple ways. 278 def __reduce__(self, *args): 279 raise NotImplementedError("shouldn't try to pickle a test case") 280 281 __reduce_ex__ = __reduce__ 282 283# 284# Return the value of a semaphore 285# 286 287def get_value(self): 288 try: 289 return self.get_value() 290 except AttributeError: 291 try: 292 return self._Semaphore__value 293 except AttributeError: 294 try: 295 return self._value 296 except AttributeError: 297 raise NotImplementedError 298 299# 300# Testcases 301# 302 303class DummyCallable: 304 def __call__(self, q, c): 305 assert isinstance(c, DummyCallable) 306 q.put(5) 307 308 309class _TestProcess(BaseTestCase): 310 311 ALLOWED_TYPES = ('processes', 'threads') 312 313 def test_current(self): 314 if self.TYPE == 'threads': 315 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 316 317 current = self.current_process() 318 authkey = current.authkey 319 320 self.assertTrue(current.is_alive()) 321 self.assertTrue(not current.daemon) 322 self.assertIsInstance(authkey, bytes) 323 self.assertTrue(len(authkey) > 0) 324 self.assertEqual(current.ident, os.getpid()) 325 self.assertEqual(current.exitcode, None) 326 327 def test_set_executable(self): 328 if self.TYPE == 'threads': 329 self.skipTest(f'test not appropriate for {self.TYPE}') 330 paths = [ 331 sys.executable, # str 332 os.fsencode(sys.executable), # bytes 333 os_helper.FakePath(sys.executable), # os.PathLike 334 os_helper.FakePath(os.fsencode(sys.executable)), # os.PathLike bytes 335 ] 336 for path in paths: 337 self.set_executable(path) 338 p = self.Process() 339 p.start() 340 p.join() 341 self.assertEqual(p.exitcode, 0) 342 343 @support.requires_resource('cpu') 344 def test_args_argument(self): 345 # bpo-45735: Using list or tuple as *args* in constructor could 346 # achieve the same effect. 347 args_cases = (1, "str", [1], (1,)) 348 args_types = (list, tuple) 349 350 test_cases = itertools.product(args_cases, args_types) 351 352 for args, args_type in test_cases: 353 with self.subTest(args=args, args_type=args_type): 354 q = self.Queue(1) 355 # pass a tuple or list as args 356 p = self.Process(target=self._test_args, args=args_type((q, args))) 357 p.daemon = True 358 p.start() 359 child_args = q.get() 360 self.assertEqual(child_args, args) 361 p.join() 362 close_queue(q) 363 364 @classmethod 365 def _test_args(cls, q, arg): 366 q.put(arg) 367 368 def test_daemon_argument(self): 369 if self.TYPE == "threads": 370 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 371 372 # By default uses the current process's daemon flag. 373 proc0 = self.Process(target=self._test) 374 self.assertEqual(proc0.daemon, self.current_process().daemon) 375 proc1 = self.Process(target=self._test, daemon=True) 376 self.assertTrue(proc1.daemon) 377 proc2 = self.Process(target=self._test, daemon=False) 378 self.assertFalse(proc2.daemon) 379 380 @classmethod 381 def _test(cls, q, *args, **kwds): 382 current = cls.current_process() 383 q.put(args) 384 q.put(kwds) 385 q.put(current.name) 386 if cls.TYPE != 'threads': 387 q.put(bytes(current.authkey)) 388 q.put(current.pid) 389 390 def test_parent_process_attributes(self): 391 if self.TYPE == "threads": 392 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 393 394 self.assertIsNone(self.parent_process()) 395 396 rconn, wconn = self.Pipe(duplex=False) 397 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 398 p.start() 399 p.join() 400 parent_pid, parent_name = rconn.recv() 401 self.assertEqual(parent_pid, self.current_process().pid) 402 self.assertEqual(parent_pid, os.getpid()) 403 self.assertEqual(parent_name, self.current_process().name) 404 405 @classmethod 406 def _test_send_parent_process(cls, wconn): 407 from multiprocessing.process import parent_process 408 wconn.send([parent_process().pid, parent_process().name]) 409 410 def test_parent_process(self): 411 if self.TYPE == "threads": 412 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 413 414 # Launch a child process. Make it launch a grandchild process. Kill the 415 # child process and make sure that the grandchild notices the death of 416 # its parent (a.k.a the child process). 417 rconn, wconn = self.Pipe(duplex=False) 418 p = self.Process( 419 target=self._test_create_grandchild_process, args=(wconn, )) 420 p.start() 421 422 if not rconn.poll(timeout=support.LONG_TIMEOUT): 423 raise AssertionError("Could not communicate with child process") 424 parent_process_status = rconn.recv() 425 self.assertEqual(parent_process_status, "alive") 426 427 p.terminate() 428 p.join() 429 430 if not rconn.poll(timeout=support.LONG_TIMEOUT): 431 raise AssertionError("Could not communicate with child process") 432 parent_process_status = rconn.recv() 433 self.assertEqual(parent_process_status, "not alive") 434 435 @classmethod 436 def _test_create_grandchild_process(cls, wconn): 437 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 438 p.start() 439 time.sleep(300) 440 441 @classmethod 442 def _test_report_parent_status(cls, wconn): 443 from multiprocessing.process import parent_process 444 wconn.send("alive" if parent_process().is_alive() else "not alive") 445 parent_process().join(timeout=support.SHORT_TIMEOUT) 446 wconn.send("alive" if parent_process().is_alive() else "not alive") 447 448 def test_process(self): 449 q = self.Queue(1) 450 e = self.Event() 451 args = (q, 1, 2) 452 kwargs = {'hello':23, 'bye':2.54} 453 name = 'SomeProcess' 454 p = self.Process( 455 target=self._test, args=args, kwargs=kwargs, name=name 456 ) 457 p.daemon = True 458 current = self.current_process() 459 460 if self.TYPE != 'threads': 461 self.assertEqual(p.authkey, current.authkey) 462 self.assertEqual(p.is_alive(), False) 463 self.assertEqual(p.daemon, True) 464 self.assertNotIn(p, self.active_children()) 465 self.assertTrue(type(self.active_children()) is list) 466 self.assertEqual(p.exitcode, None) 467 468 p.start() 469 470 self.assertEqual(p.exitcode, None) 471 self.assertEqual(p.is_alive(), True) 472 self.assertIn(p, self.active_children()) 473 474 self.assertEqual(q.get(), args[1:]) 475 self.assertEqual(q.get(), kwargs) 476 self.assertEqual(q.get(), p.name) 477 if self.TYPE != 'threads': 478 self.assertEqual(q.get(), current.authkey) 479 self.assertEqual(q.get(), p.pid) 480 481 p.join() 482 483 self.assertEqual(p.exitcode, 0) 484 self.assertEqual(p.is_alive(), False) 485 self.assertNotIn(p, self.active_children()) 486 close_queue(q) 487 488 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 489 def test_process_mainthread_native_id(self): 490 if self.TYPE == 'threads': 491 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 492 493 current_mainthread_native_id = threading.main_thread().native_id 494 495 q = self.Queue(1) 496 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 497 p.start() 498 499 child_mainthread_native_id = q.get() 500 p.join() 501 close_queue(q) 502 503 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 504 505 @classmethod 506 def _test_process_mainthread_native_id(cls, q): 507 mainthread_native_id = threading.main_thread().native_id 508 q.put(mainthread_native_id) 509 510 @classmethod 511 def _sleep_some(cls): 512 time.sleep(100) 513 514 @classmethod 515 def _test_sleep(cls, delay): 516 time.sleep(delay) 517 518 def _kill_process(self, meth): 519 if self.TYPE == 'threads': 520 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 521 522 p = self.Process(target=self._sleep_some) 523 p.daemon = True 524 p.start() 525 526 self.assertEqual(p.is_alive(), True) 527 self.assertIn(p, self.active_children()) 528 self.assertEqual(p.exitcode, None) 529 530 join = TimingWrapper(p.join) 531 532 self.assertEqual(join(0), None) 533 self.assertTimingAlmostEqual(join.elapsed, 0.0) 534 self.assertEqual(p.is_alive(), True) 535 536 self.assertEqual(join(-1), None) 537 self.assertTimingAlmostEqual(join.elapsed, 0.0) 538 self.assertEqual(p.is_alive(), True) 539 540 # XXX maybe terminating too soon causes the problems on Gentoo... 541 time.sleep(1) 542 543 meth(p) 544 545 if hasattr(signal, 'alarm'): 546 # On the Gentoo buildbot waitpid() often seems to block forever. 547 # We use alarm() to interrupt it if it blocks for too long. 548 def handler(*args): 549 raise RuntimeError('join took too long: %s' % p) 550 old_handler = signal.signal(signal.SIGALRM, handler) 551 try: 552 signal.alarm(10) 553 self.assertEqual(join(), None) 554 finally: 555 signal.alarm(0) 556 signal.signal(signal.SIGALRM, old_handler) 557 else: 558 self.assertEqual(join(), None) 559 560 self.assertTimingAlmostEqual(join.elapsed, 0.0) 561 562 self.assertEqual(p.is_alive(), False) 563 self.assertNotIn(p, self.active_children()) 564 565 p.join() 566 567 return p.exitcode 568 569 def test_terminate(self): 570 exitcode = self._kill_process(multiprocessing.Process.terminate) 571 self.assertEqual(exitcode, -signal.SIGTERM) 572 573 def test_kill(self): 574 exitcode = self._kill_process(multiprocessing.Process.kill) 575 if os.name != 'nt': 576 self.assertEqual(exitcode, -signal.SIGKILL) 577 else: 578 self.assertEqual(exitcode, -signal.SIGTERM) 579 580 def test_cpu_count(self): 581 try: 582 cpus = multiprocessing.cpu_count() 583 except NotImplementedError: 584 cpus = 1 585 self.assertTrue(type(cpus) is int) 586 self.assertTrue(cpus >= 1) 587 588 def test_active_children(self): 589 self.assertEqual(type(self.active_children()), list) 590 591 p = self.Process(target=time.sleep, args=(DELTA,)) 592 self.assertNotIn(p, self.active_children()) 593 594 p.daemon = True 595 p.start() 596 self.assertIn(p, self.active_children()) 597 598 p.join() 599 self.assertNotIn(p, self.active_children()) 600 601 @classmethod 602 def _test_recursion(cls, wconn, id): 603 wconn.send(id) 604 if len(id) < 2: 605 for i in range(2): 606 p = cls.Process( 607 target=cls._test_recursion, args=(wconn, id+[i]) 608 ) 609 p.start() 610 p.join() 611 612 def test_recursion(self): 613 rconn, wconn = self.Pipe(duplex=False) 614 self._test_recursion(wconn, []) 615 616 time.sleep(DELTA) 617 result = [] 618 while rconn.poll(): 619 result.append(rconn.recv()) 620 621 expected = [ 622 [], 623 [0], 624 [0, 0], 625 [0, 1], 626 [1], 627 [1, 0], 628 [1, 1] 629 ] 630 self.assertEqual(result, expected) 631 632 @classmethod 633 def _test_sentinel(cls, event): 634 event.wait(10.0) 635 636 def test_sentinel(self): 637 if self.TYPE == "threads": 638 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 639 event = self.Event() 640 p = self.Process(target=self._test_sentinel, args=(event,)) 641 with self.assertRaises(ValueError): 642 p.sentinel 643 p.start() 644 self.addCleanup(p.join) 645 sentinel = p.sentinel 646 self.assertIsInstance(sentinel, int) 647 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 648 event.set() 649 p.join() 650 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 651 652 @classmethod 653 def _test_close(cls, rc=0, q=None): 654 if q is not None: 655 q.get() 656 sys.exit(rc) 657 658 def test_close(self): 659 if self.TYPE == "threads": 660 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 661 q = self.Queue() 662 p = self.Process(target=self._test_close, kwargs={'q': q}) 663 p.daemon = True 664 p.start() 665 self.assertEqual(p.is_alive(), True) 666 # Child is still alive, cannot close 667 with self.assertRaises(ValueError): 668 p.close() 669 670 q.put(None) 671 p.join() 672 self.assertEqual(p.is_alive(), False) 673 self.assertEqual(p.exitcode, 0) 674 p.close() 675 with self.assertRaises(ValueError): 676 p.is_alive() 677 with self.assertRaises(ValueError): 678 p.join() 679 with self.assertRaises(ValueError): 680 p.terminate() 681 p.close() 682 683 wr = weakref.ref(p) 684 del p 685 gc.collect() 686 self.assertIs(wr(), None) 687 688 close_queue(q) 689 690 @support.requires_resource('walltime') 691 def test_many_processes(self): 692 if self.TYPE == 'threads': 693 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 694 695 sm = multiprocessing.get_start_method() 696 N = 5 if sm == 'spawn' else 100 697 698 # Try to overwhelm the forkserver loop with events 699 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 700 for i in range(N)] 701 for p in procs: 702 p.start() 703 for p in procs: 704 join_process(p) 705 for p in procs: 706 self.assertEqual(p.exitcode, 0) 707 708 procs = [self.Process(target=self._sleep_some) 709 for i in range(N)] 710 for p in procs: 711 p.start() 712 time.sleep(0.001) # let the children start... 713 for p in procs: 714 p.terminate() 715 for p in procs: 716 join_process(p) 717 if os.name != 'nt': 718 exitcodes = [-signal.SIGTERM] 719 if sys.platform == 'darwin': 720 # bpo-31510: On macOS, killing a freshly started process with 721 # SIGTERM sometimes kills the process with SIGKILL. 722 exitcodes.append(-signal.SIGKILL) 723 for p in procs: 724 self.assertIn(p.exitcode, exitcodes) 725 726 def test_lose_target_ref(self): 727 c = DummyCallable() 728 wr = weakref.ref(c) 729 q = self.Queue() 730 p = self.Process(target=c, args=(q, c)) 731 del c 732 p.start() 733 p.join() 734 gc.collect() # For PyPy or other GCs. 735 self.assertIs(wr(), None) 736 self.assertEqual(q.get(), 5) 737 close_queue(q) 738 739 @classmethod 740 def _test_child_fd_inflation(self, evt, q): 741 q.put(os_helper.fd_count()) 742 evt.wait() 743 744 def test_child_fd_inflation(self): 745 # Number of fds in child processes should not grow with the 746 # number of running children. 747 if self.TYPE == 'threads': 748 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 749 750 sm = multiprocessing.get_start_method() 751 if sm == 'fork': 752 # The fork method by design inherits all fds from the parent, 753 # trying to go against it is a lost battle 754 self.skipTest('test not appropriate for {}'.format(sm)) 755 756 N = 5 757 evt = self.Event() 758 q = self.Queue() 759 760 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 761 for i in range(N)] 762 for p in procs: 763 p.start() 764 765 try: 766 fd_counts = [q.get() for i in range(N)] 767 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 768 769 finally: 770 evt.set() 771 for p in procs: 772 p.join() 773 close_queue(q) 774 775 @classmethod 776 def _test_wait_for_threads(self, evt): 777 def func1(): 778 time.sleep(0.5) 779 evt.set() 780 781 def func2(): 782 time.sleep(20) 783 evt.clear() 784 785 threading.Thread(target=func1).start() 786 threading.Thread(target=func2, daemon=True).start() 787 788 def test_wait_for_threads(self): 789 # A child process should wait for non-daemonic threads to end 790 # before exiting 791 if self.TYPE == 'threads': 792 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 793 794 evt = self.Event() 795 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 796 proc.start() 797 proc.join() 798 self.assertTrue(evt.is_set()) 799 800 @classmethod 801 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 802 for stream_name, action in break_std_streams.items(): 803 if action == 'close': 804 stream = io.StringIO() 805 stream.close() 806 else: 807 assert action == 'remove' 808 stream = None 809 setattr(sys, stream_name, None) 810 evt.set() 811 812 def test_error_on_stdio_flush_1(self): 813 # Check that Process works with broken standard streams 814 streams = [io.StringIO(), None] 815 streams[0].close() 816 for stream_name in ('stdout', 'stderr'): 817 for stream in streams: 818 old_stream = getattr(sys, stream_name) 819 setattr(sys, stream_name, stream) 820 try: 821 evt = self.Event() 822 proc = self.Process(target=self._test_error_on_stdio_flush, 823 args=(evt,)) 824 proc.start() 825 proc.join() 826 self.assertTrue(evt.is_set()) 827 self.assertEqual(proc.exitcode, 0) 828 finally: 829 setattr(sys, stream_name, old_stream) 830 831 def test_error_on_stdio_flush_2(self): 832 # Same as test_error_on_stdio_flush_1(), but standard streams are 833 # broken by the child process 834 for stream_name in ('stdout', 'stderr'): 835 for action in ('close', 'remove'): 836 old_stream = getattr(sys, stream_name) 837 try: 838 evt = self.Event() 839 proc = self.Process(target=self._test_error_on_stdio_flush, 840 args=(evt, {stream_name: action})) 841 proc.start() 842 proc.join() 843 self.assertTrue(evt.is_set()) 844 self.assertEqual(proc.exitcode, 0) 845 finally: 846 setattr(sys, stream_name, old_stream) 847 848 @classmethod 849 def _sleep_and_set_event(self, evt, delay=0.0): 850 time.sleep(delay) 851 evt.set() 852 853 def check_forkserver_death(self, signum): 854 # bpo-31308: if the forkserver process has died, we should still 855 # be able to create and run new Process instances (the forkserver 856 # is implicitly restarted). 857 if self.TYPE == 'threads': 858 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 859 sm = multiprocessing.get_start_method() 860 if sm != 'forkserver': 861 # The fork method by design inherits all fds from the parent, 862 # trying to go against it is a lost battle 863 self.skipTest('test not appropriate for {}'.format(sm)) 864 865 from multiprocessing.forkserver import _forkserver 866 _forkserver.ensure_running() 867 868 # First process sleeps 500 ms 869 delay = 0.5 870 871 evt = self.Event() 872 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 873 proc.start() 874 875 pid = _forkserver._forkserver_pid 876 os.kill(pid, signum) 877 # give time to the fork server to die and time to proc to complete 878 time.sleep(delay * 2.0) 879 880 evt2 = self.Event() 881 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 882 proc2.start() 883 proc2.join() 884 self.assertTrue(evt2.is_set()) 885 self.assertEqual(proc2.exitcode, 0) 886 887 proc.join() 888 self.assertTrue(evt.is_set()) 889 self.assertIn(proc.exitcode, (0, 255)) 890 891 def test_forkserver_sigint(self): 892 # Catchable signal 893 self.check_forkserver_death(signal.SIGINT) 894 895 def test_forkserver_sigkill(self): 896 # Uncatchable signal 897 if os.name != 'nt': 898 self.check_forkserver_death(signal.SIGKILL) 899 900 901# 902# 903# 904 905class _UpperCaser(multiprocessing.Process): 906 907 def __init__(self): 908 multiprocessing.Process.__init__(self) 909 self.child_conn, self.parent_conn = multiprocessing.Pipe() 910 911 def run(self): 912 self.parent_conn.close() 913 for s in iter(self.child_conn.recv, None): 914 self.child_conn.send(s.upper()) 915 self.child_conn.close() 916 917 def submit(self, s): 918 assert type(s) is str 919 self.parent_conn.send(s) 920 return self.parent_conn.recv() 921 922 def stop(self): 923 self.parent_conn.send(None) 924 self.parent_conn.close() 925 self.child_conn.close() 926 927class _TestSubclassingProcess(BaseTestCase): 928 929 ALLOWED_TYPES = ('processes',) 930 931 def test_subclassing(self): 932 uppercaser = _UpperCaser() 933 uppercaser.daemon = True 934 uppercaser.start() 935 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 936 self.assertEqual(uppercaser.submit('world'), 'WORLD') 937 uppercaser.stop() 938 uppercaser.join() 939 940 def test_stderr_flush(self): 941 # sys.stderr is flushed at process shutdown (issue #13812) 942 if self.TYPE == "threads": 943 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 944 945 testfn = os_helper.TESTFN 946 self.addCleanup(os_helper.unlink, testfn) 947 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 948 proc.start() 949 proc.join() 950 with open(testfn, encoding="utf-8") as f: 951 err = f.read() 952 # The whole traceback was printed 953 self.assertIn("ZeroDivisionError", err) 954 self.assertIn("test_multiprocessing.py", err) 955 self.assertIn("1/0 # MARKER", err) 956 957 @classmethod 958 def _test_stderr_flush(cls, testfn): 959 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 960 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 961 1/0 # MARKER 962 963 964 @classmethod 965 def _test_sys_exit(cls, reason, testfn): 966 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 967 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 968 sys.exit(reason) 969 970 def test_sys_exit(self): 971 # See Issue 13854 972 if self.TYPE == 'threads': 973 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 974 975 testfn = os_helper.TESTFN 976 self.addCleanup(os_helper.unlink, testfn) 977 978 for reason in ( 979 [1, 2, 3], 980 'ignore this', 981 ): 982 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 983 p.daemon = True 984 p.start() 985 join_process(p) 986 self.assertEqual(p.exitcode, 1) 987 988 with open(testfn, encoding="utf-8") as f: 989 content = f.read() 990 self.assertEqual(content.rstrip(), str(reason)) 991 992 os.unlink(testfn) 993 994 cases = [ 995 ((True,), 1), 996 ((False,), 0), 997 ((8,), 8), 998 ((None,), 0), 999 ((), 0), 1000 ] 1001 1002 for args, expected in cases: 1003 with self.subTest(args=args): 1004 p = self.Process(target=sys.exit, args=args) 1005 p.daemon = True 1006 p.start() 1007 join_process(p) 1008 self.assertEqual(p.exitcode, expected) 1009 1010# 1011# 1012# 1013 1014def queue_empty(q): 1015 if hasattr(q, 'empty'): 1016 return q.empty() 1017 else: 1018 return q.qsize() == 0 1019 1020def queue_full(q, maxsize): 1021 if hasattr(q, 'full'): 1022 return q.full() 1023 else: 1024 return q.qsize() == maxsize 1025 1026 1027class _TestQueue(BaseTestCase): 1028 1029 1030 @classmethod 1031 def _test_put(cls, queue, child_can_start, parent_can_continue): 1032 child_can_start.wait() 1033 for i in range(6): 1034 queue.get() 1035 parent_can_continue.set() 1036 1037 def test_put(self): 1038 MAXSIZE = 6 1039 queue = self.Queue(maxsize=MAXSIZE) 1040 child_can_start = self.Event() 1041 parent_can_continue = self.Event() 1042 1043 proc = self.Process( 1044 target=self._test_put, 1045 args=(queue, child_can_start, parent_can_continue) 1046 ) 1047 proc.daemon = True 1048 proc.start() 1049 1050 self.assertEqual(queue_empty(queue), True) 1051 self.assertEqual(queue_full(queue, MAXSIZE), False) 1052 1053 queue.put(1) 1054 queue.put(2, True) 1055 queue.put(3, True, None) 1056 queue.put(4, False) 1057 queue.put(5, False, None) 1058 queue.put_nowait(6) 1059 1060 # the values may be in buffer but not yet in pipe so sleep a bit 1061 time.sleep(DELTA) 1062 1063 self.assertEqual(queue_empty(queue), False) 1064 self.assertEqual(queue_full(queue, MAXSIZE), True) 1065 1066 put = TimingWrapper(queue.put) 1067 put_nowait = TimingWrapper(queue.put_nowait) 1068 1069 self.assertRaises(pyqueue.Full, put, 7, False) 1070 self.assertTimingAlmostEqual(put.elapsed, 0) 1071 1072 self.assertRaises(pyqueue.Full, put, 7, False, None) 1073 self.assertTimingAlmostEqual(put.elapsed, 0) 1074 1075 self.assertRaises(pyqueue.Full, put_nowait, 7) 1076 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 1077 1078 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 1079 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 1080 1081 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 1082 self.assertTimingAlmostEqual(put.elapsed, 0) 1083 1084 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 1085 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 1086 1087 child_can_start.set() 1088 parent_can_continue.wait() 1089 1090 self.assertEqual(queue_empty(queue), True) 1091 self.assertEqual(queue_full(queue, MAXSIZE), False) 1092 1093 proc.join() 1094 close_queue(queue) 1095 1096 @classmethod 1097 def _test_get(cls, queue, child_can_start, parent_can_continue): 1098 child_can_start.wait() 1099 #queue.put(1) 1100 queue.put(2) 1101 queue.put(3) 1102 queue.put(4) 1103 queue.put(5) 1104 parent_can_continue.set() 1105 1106 def test_get(self): 1107 queue = self.Queue() 1108 child_can_start = self.Event() 1109 parent_can_continue = self.Event() 1110 1111 proc = self.Process( 1112 target=self._test_get, 1113 args=(queue, child_can_start, parent_can_continue) 1114 ) 1115 proc.daemon = True 1116 proc.start() 1117 1118 self.assertEqual(queue_empty(queue), True) 1119 1120 child_can_start.set() 1121 parent_can_continue.wait() 1122 1123 time.sleep(DELTA) 1124 self.assertEqual(queue_empty(queue), False) 1125 1126 # Hangs unexpectedly, remove for now 1127 #self.assertEqual(queue.get(), 1) 1128 self.assertEqual(queue.get(True, None), 2) 1129 self.assertEqual(queue.get(True), 3) 1130 self.assertEqual(queue.get(timeout=1), 4) 1131 self.assertEqual(queue.get_nowait(), 5) 1132 1133 self.assertEqual(queue_empty(queue), True) 1134 1135 get = TimingWrapper(queue.get) 1136 get_nowait = TimingWrapper(queue.get_nowait) 1137 1138 self.assertRaises(pyqueue.Empty, get, False) 1139 self.assertTimingAlmostEqual(get.elapsed, 0) 1140 1141 self.assertRaises(pyqueue.Empty, get, False, None) 1142 self.assertTimingAlmostEqual(get.elapsed, 0) 1143 1144 self.assertRaises(pyqueue.Empty, get_nowait) 1145 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1146 1147 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1148 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1149 1150 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1151 self.assertTimingAlmostEqual(get.elapsed, 0) 1152 1153 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1154 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1155 1156 proc.join() 1157 close_queue(queue) 1158 1159 @classmethod 1160 def _test_fork(cls, queue): 1161 for i in range(10, 20): 1162 queue.put(i) 1163 # note that at this point the items may only be buffered, so the 1164 # process cannot shutdown until the feeder thread has finished 1165 # pushing items onto the pipe. 1166 1167 def test_fork(self): 1168 # Old versions of Queue would fail to create a new feeder 1169 # thread for a forked process if the original process had its 1170 # own feeder thread. This test checks that this no longer 1171 # happens. 1172 1173 queue = self.Queue() 1174 1175 # put items on queue so that main process starts a feeder thread 1176 for i in range(10): 1177 queue.put(i) 1178 1179 # wait to make sure thread starts before we fork a new process 1180 time.sleep(DELTA) 1181 1182 # fork process 1183 p = self.Process(target=self._test_fork, args=(queue,)) 1184 p.daemon = True 1185 p.start() 1186 1187 # check that all expected items are in the queue 1188 for i in range(20): 1189 self.assertEqual(queue.get(), i) 1190 self.assertRaises(pyqueue.Empty, queue.get, False) 1191 1192 p.join() 1193 close_queue(queue) 1194 1195 def test_qsize(self): 1196 q = self.Queue() 1197 try: 1198 self.assertEqual(q.qsize(), 0) 1199 except NotImplementedError: 1200 self.skipTest('qsize method not implemented') 1201 q.put(1) 1202 self.assertEqual(q.qsize(), 1) 1203 q.put(5) 1204 self.assertEqual(q.qsize(), 2) 1205 q.get() 1206 self.assertEqual(q.qsize(), 1) 1207 q.get() 1208 self.assertEqual(q.qsize(), 0) 1209 close_queue(q) 1210 1211 @classmethod 1212 def _test_task_done(cls, q): 1213 for obj in iter(q.get, None): 1214 time.sleep(DELTA) 1215 q.task_done() 1216 1217 def test_task_done(self): 1218 queue = self.JoinableQueue() 1219 1220 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1221 for i in range(4)] 1222 1223 for p in workers: 1224 p.daemon = True 1225 p.start() 1226 1227 for i in range(10): 1228 queue.put(i) 1229 1230 queue.join() 1231 1232 for p in workers: 1233 queue.put(None) 1234 1235 for p in workers: 1236 p.join() 1237 close_queue(queue) 1238 1239 def test_no_import_lock_contention(self): 1240 with os_helper.temp_cwd(): 1241 module_name = 'imported_by_an_imported_module' 1242 with open(module_name + '.py', 'w', encoding="utf-8") as f: 1243 f.write("""if 1: 1244 import multiprocessing 1245 1246 q = multiprocessing.Queue() 1247 q.put('knock knock') 1248 q.get(timeout=3) 1249 q.close() 1250 del q 1251 """) 1252 1253 with import_helper.DirsOnSysPath(os.getcwd()): 1254 try: 1255 __import__(module_name) 1256 except pyqueue.Empty: 1257 self.fail("Probable regression on import lock contention;" 1258 " see Issue #22853") 1259 1260 def test_timeout(self): 1261 q = multiprocessing.Queue() 1262 start = time.monotonic() 1263 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1264 delta = time.monotonic() - start 1265 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1266 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1267 # failed because the delta was only 135.8 ms. 1268 self.assertGreaterEqual(delta, 0.100) 1269 close_queue(q) 1270 1271 def test_queue_feeder_donot_stop_onexc(self): 1272 # bpo-30414: verify feeder handles exceptions correctly 1273 if self.TYPE != 'processes': 1274 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1275 1276 class NotSerializable(object): 1277 def __reduce__(self): 1278 raise AttributeError 1279 with test.support.captured_stderr(): 1280 q = self.Queue() 1281 q.put(NotSerializable()) 1282 q.put(True) 1283 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1284 close_queue(q) 1285 1286 with test.support.captured_stderr(): 1287 # bpo-33078: verify that the queue size is correctly handled 1288 # on errors. 1289 q = self.Queue(maxsize=1) 1290 q.put(NotSerializable()) 1291 q.put(True) 1292 try: 1293 self.assertEqual(q.qsize(), 1) 1294 except NotImplementedError: 1295 # qsize is not available on all platform as it 1296 # relies on sem_getvalue 1297 pass 1298 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1299 # Check that the size of the queue is correct 1300 self.assertTrue(q.empty()) 1301 close_queue(q) 1302 1303 def test_queue_feeder_on_queue_feeder_error(self): 1304 # bpo-30006: verify feeder handles exceptions using the 1305 # _on_queue_feeder_error hook. 1306 if self.TYPE != 'processes': 1307 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1308 1309 class NotSerializable(object): 1310 """Mock unserializable object""" 1311 def __init__(self): 1312 self.reduce_was_called = False 1313 self.on_queue_feeder_error_was_called = False 1314 1315 def __reduce__(self): 1316 self.reduce_was_called = True 1317 raise AttributeError 1318 1319 class SafeQueue(multiprocessing.queues.Queue): 1320 """Queue with overloaded _on_queue_feeder_error hook""" 1321 @staticmethod 1322 def _on_queue_feeder_error(e, obj): 1323 if (isinstance(e, AttributeError) and 1324 isinstance(obj, NotSerializable)): 1325 obj.on_queue_feeder_error_was_called = True 1326 1327 not_serializable_obj = NotSerializable() 1328 # The captured_stderr reduces the noise in the test report 1329 with test.support.captured_stderr(): 1330 q = SafeQueue(ctx=multiprocessing.get_context()) 1331 q.put(not_serializable_obj) 1332 1333 # Verify that q is still functioning correctly 1334 q.put(True) 1335 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1336 1337 # Assert that the serialization and the hook have been called correctly 1338 self.assertTrue(not_serializable_obj.reduce_was_called) 1339 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1340 1341 def test_closed_queue_empty_exceptions(self): 1342 # Assert that checking the emptiness of an unused closed queue 1343 # does not raise an OSError. The rationale is that q.close() is 1344 # a no-op upon construction and becomes effective once the queue 1345 # has been used (e.g., by calling q.put()). 1346 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1347 q.close() # this is a no-op since the feeder thread is None 1348 q.join_thread() # this is also a no-op 1349 self.assertTrue(q.empty()) 1350 1351 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1352 q.put('foo') # make sure that the queue is 'used' 1353 q.close() # close the feeder thread 1354 q.join_thread() # make sure to join the feeder thread 1355 with self.assertRaisesRegex(OSError, 'is closed'): 1356 q.empty() 1357 1358 def test_closed_queue_put_get_exceptions(self): 1359 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1360 q.close() 1361 with self.assertRaisesRegex(ValueError, 'is closed'): 1362 q.put('foo') 1363 with self.assertRaisesRegex(ValueError, 'is closed'): 1364 q.get() 1365# 1366# 1367# 1368 1369class _TestLock(BaseTestCase): 1370 1371 @staticmethod 1372 def _acquire(lock, l=None): 1373 lock.acquire() 1374 if l is not None: 1375 l.append(repr(lock)) 1376 1377 @staticmethod 1378 def _acquire_event(lock, event): 1379 lock.acquire() 1380 event.set() 1381 time.sleep(1.0) 1382 1383 def test_repr_lock(self): 1384 if self.TYPE != 'processes': 1385 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1386 1387 lock = self.Lock() 1388 self.assertEqual(f'<Lock(owner=None)>', repr(lock)) 1389 1390 lock.acquire() 1391 self.assertEqual(f'<Lock(owner=MainProcess)>', repr(lock)) 1392 lock.release() 1393 1394 tname = 'T1' 1395 l = [] 1396 t = threading.Thread(target=self._acquire, 1397 args=(lock, l), 1398 name=tname) 1399 t.start() 1400 time.sleep(0.1) 1401 self.assertEqual(f'<Lock(owner=MainProcess|{tname})>', l[0]) 1402 lock.release() 1403 1404 t = threading.Thread(target=self._acquire, 1405 args=(lock,), 1406 name=tname) 1407 t.start() 1408 time.sleep(0.1) 1409 self.assertEqual('<Lock(owner=SomeOtherThread)>', repr(lock)) 1410 lock.release() 1411 1412 pname = 'P1' 1413 l = multiprocessing.Manager().list() 1414 p = self.Process(target=self._acquire, 1415 args=(lock, l), 1416 name=pname) 1417 p.start() 1418 p.join() 1419 self.assertEqual(f'<Lock(owner={pname})>', l[0]) 1420 1421 lock = self.Lock() 1422 event = self.Event() 1423 p = self.Process(target=self._acquire_event, 1424 args=(lock, event), 1425 name='P2') 1426 p.start() 1427 event.wait() 1428 self.assertEqual(f'<Lock(owner=SomeOtherProcess)>', repr(lock)) 1429 p.terminate() 1430 1431 def test_lock(self): 1432 lock = self.Lock() 1433 self.assertEqual(lock.acquire(), True) 1434 self.assertEqual(lock.acquire(False), False) 1435 self.assertEqual(lock.release(), None) 1436 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1437 1438 @staticmethod 1439 def _acquire_release(lock, timeout, l=None, n=1): 1440 for _ in range(n): 1441 lock.acquire() 1442 if l is not None: 1443 l.append(repr(lock)) 1444 time.sleep(timeout) 1445 for _ in range(n): 1446 lock.release() 1447 1448 def test_repr_rlock(self): 1449 if self.TYPE != 'processes': 1450 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1451 1452 lock = self.RLock() 1453 self.assertEqual('<RLock(None, 0)>', repr(lock)) 1454 1455 n = 3 1456 for _ in range(n): 1457 lock.acquire() 1458 self.assertEqual(f'<RLock(MainProcess, {n})>', repr(lock)) 1459 for _ in range(n): 1460 lock.release() 1461 1462 t, l = [], [] 1463 for i in range(n): 1464 t.append(threading.Thread(target=self._acquire_release, 1465 args=(lock, 0.1, l, i+1), 1466 name=f'T{i+1}')) 1467 t[-1].start() 1468 for t_ in t: 1469 t_.join() 1470 for i in range(n): 1471 self.assertIn(f'<RLock(MainProcess|T{i+1}, {i+1})>', l) 1472 1473 1474 t = threading.Thread(target=self._acquire_release, 1475 args=(lock, 0.2), 1476 name=f'T1') 1477 t.start() 1478 time.sleep(0.1) 1479 self.assertEqual('<RLock(SomeOtherThread, nonzero)>', repr(lock)) 1480 time.sleep(0.2) 1481 1482 pname = 'P1' 1483 l = multiprocessing.Manager().list() 1484 p = self.Process(target=self._acquire_release, 1485 args=(lock, 0.1, l), 1486 name=pname) 1487 p.start() 1488 p.join() 1489 self.assertEqual(f'<RLock({pname}, 1)>', l[0]) 1490 1491 event = self.Event() 1492 lock = self.RLock() 1493 p = self.Process(target=self._acquire_event, 1494 args=(lock, event)) 1495 p.start() 1496 event.wait() 1497 self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(lock)) 1498 p.join() 1499 1500 def test_rlock(self): 1501 lock = self.RLock() 1502 self.assertEqual(lock.acquire(), True) 1503 self.assertEqual(lock.acquire(), True) 1504 self.assertEqual(lock.acquire(), True) 1505 self.assertEqual(lock.release(), None) 1506 self.assertEqual(lock.release(), None) 1507 self.assertEqual(lock.release(), None) 1508 self.assertRaises((AssertionError, RuntimeError), lock.release) 1509 1510 def test_lock_context(self): 1511 with self.Lock(): 1512 pass 1513 1514 1515class _TestSemaphore(BaseTestCase): 1516 1517 def _test_semaphore(self, sem): 1518 self.assertReturnsIfImplemented(2, get_value, sem) 1519 self.assertEqual(sem.acquire(), True) 1520 self.assertReturnsIfImplemented(1, get_value, sem) 1521 self.assertEqual(sem.acquire(), True) 1522 self.assertReturnsIfImplemented(0, get_value, sem) 1523 self.assertEqual(sem.acquire(False), False) 1524 self.assertReturnsIfImplemented(0, get_value, sem) 1525 self.assertEqual(sem.release(), None) 1526 self.assertReturnsIfImplemented(1, get_value, sem) 1527 self.assertEqual(sem.release(), None) 1528 self.assertReturnsIfImplemented(2, get_value, sem) 1529 1530 def test_semaphore(self): 1531 sem = self.Semaphore(2) 1532 self._test_semaphore(sem) 1533 self.assertEqual(sem.release(), None) 1534 self.assertReturnsIfImplemented(3, get_value, sem) 1535 self.assertEqual(sem.release(), None) 1536 self.assertReturnsIfImplemented(4, get_value, sem) 1537 1538 def test_bounded_semaphore(self): 1539 sem = self.BoundedSemaphore(2) 1540 self._test_semaphore(sem) 1541 # Currently fails on OS/X 1542 #if HAVE_GETVALUE: 1543 # self.assertRaises(ValueError, sem.release) 1544 # self.assertReturnsIfImplemented(2, get_value, sem) 1545 1546 def test_timeout(self): 1547 if self.TYPE != 'processes': 1548 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1549 1550 sem = self.Semaphore(0) 1551 acquire = TimingWrapper(sem.acquire) 1552 1553 self.assertEqual(acquire(False), False) 1554 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1555 1556 self.assertEqual(acquire(False, None), False) 1557 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1558 1559 self.assertEqual(acquire(False, TIMEOUT1), False) 1560 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1561 1562 self.assertEqual(acquire(True, TIMEOUT2), False) 1563 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1564 1565 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1566 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1567 1568 1569class _TestCondition(BaseTestCase): 1570 1571 @classmethod 1572 def f(cls, cond, sleeping, woken, timeout=None): 1573 cond.acquire() 1574 sleeping.release() 1575 cond.wait(timeout) 1576 woken.release() 1577 cond.release() 1578 1579 def assertReachesEventually(self, func, value): 1580 for i in range(10): 1581 try: 1582 if func() == value: 1583 break 1584 except NotImplementedError: 1585 break 1586 time.sleep(DELTA) 1587 time.sleep(DELTA) 1588 self.assertReturnsIfImplemented(value, func) 1589 1590 def check_invariant(self, cond): 1591 # this is only supposed to succeed when there are no sleepers 1592 if self.TYPE == 'processes': 1593 try: 1594 sleepers = (cond._sleeping_count.get_value() - 1595 cond._woken_count.get_value()) 1596 self.assertEqual(sleepers, 0) 1597 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1598 except NotImplementedError: 1599 pass 1600 1601 def test_notify(self): 1602 cond = self.Condition() 1603 sleeping = self.Semaphore(0) 1604 woken = self.Semaphore(0) 1605 1606 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1607 p.daemon = True 1608 p.start() 1609 self.addCleanup(p.join) 1610 1611 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1612 p.daemon = True 1613 p.start() 1614 self.addCleanup(p.join) 1615 1616 # wait for both children to start sleeping 1617 sleeping.acquire() 1618 sleeping.acquire() 1619 1620 # check no process/thread has woken up 1621 time.sleep(DELTA) 1622 self.assertReturnsIfImplemented(0, get_value, woken) 1623 1624 # wake up one process/thread 1625 cond.acquire() 1626 cond.notify() 1627 cond.release() 1628 1629 # check one process/thread has woken up 1630 time.sleep(DELTA) 1631 self.assertReturnsIfImplemented(1, get_value, woken) 1632 1633 # wake up another 1634 cond.acquire() 1635 cond.notify() 1636 cond.release() 1637 1638 # check other has woken up 1639 time.sleep(DELTA) 1640 self.assertReturnsIfImplemented(2, get_value, woken) 1641 1642 # check state is not mucked up 1643 self.check_invariant(cond) 1644 p.join() 1645 1646 def test_notify_all(self): 1647 cond = self.Condition() 1648 sleeping = self.Semaphore(0) 1649 woken = self.Semaphore(0) 1650 1651 # start some threads/processes which will timeout 1652 for i in range(3): 1653 p = self.Process(target=self.f, 1654 args=(cond, sleeping, woken, TIMEOUT1)) 1655 p.daemon = True 1656 p.start() 1657 self.addCleanup(p.join) 1658 1659 t = threading.Thread(target=self.f, 1660 args=(cond, sleeping, woken, TIMEOUT1)) 1661 t.daemon = True 1662 t.start() 1663 self.addCleanup(t.join) 1664 1665 # wait for them all to sleep 1666 for i in range(6): 1667 sleeping.acquire() 1668 1669 # check they have all timed out 1670 for i in range(6): 1671 woken.acquire() 1672 self.assertReturnsIfImplemented(0, get_value, woken) 1673 1674 # check state is not mucked up 1675 self.check_invariant(cond) 1676 1677 # start some more threads/processes 1678 for i in range(3): 1679 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1680 p.daemon = True 1681 p.start() 1682 self.addCleanup(p.join) 1683 1684 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1685 t.daemon = True 1686 t.start() 1687 self.addCleanup(t.join) 1688 1689 # wait for them to all sleep 1690 for i in range(6): 1691 sleeping.acquire() 1692 1693 # check no process/thread has woken up 1694 time.sleep(DELTA) 1695 self.assertReturnsIfImplemented(0, get_value, woken) 1696 1697 # wake them all up 1698 cond.acquire() 1699 cond.notify_all() 1700 cond.release() 1701 1702 # check they have all woken 1703 self.assertReachesEventually(lambda: get_value(woken), 6) 1704 1705 # check state is not mucked up 1706 self.check_invariant(cond) 1707 1708 def test_notify_n(self): 1709 cond = self.Condition() 1710 sleeping = self.Semaphore(0) 1711 woken = self.Semaphore(0) 1712 1713 # start some threads/processes 1714 for i in range(3): 1715 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1716 p.daemon = True 1717 p.start() 1718 self.addCleanup(p.join) 1719 1720 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1721 t.daemon = True 1722 t.start() 1723 self.addCleanup(t.join) 1724 1725 # wait for them to all sleep 1726 for i in range(6): 1727 sleeping.acquire() 1728 1729 # check no process/thread has woken up 1730 time.sleep(DELTA) 1731 self.assertReturnsIfImplemented(0, get_value, woken) 1732 1733 # wake some of them up 1734 cond.acquire() 1735 cond.notify(n=2) 1736 cond.release() 1737 1738 # check 2 have woken 1739 self.assertReachesEventually(lambda: get_value(woken), 2) 1740 1741 # wake the rest of them 1742 cond.acquire() 1743 cond.notify(n=4) 1744 cond.release() 1745 1746 self.assertReachesEventually(lambda: get_value(woken), 6) 1747 1748 # doesn't do anything more 1749 cond.acquire() 1750 cond.notify(n=3) 1751 cond.release() 1752 1753 self.assertReturnsIfImplemented(6, get_value, woken) 1754 1755 # check state is not mucked up 1756 self.check_invariant(cond) 1757 1758 def test_timeout(self): 1759 cond = self.Condition() 1760 wait = TimingWrapper(cond.wait) 1761 cond.acquire() 1762 res = wait(TIMEOUT1) 1763 cond.release() 1764 self.assertEqual(res, False) 1765 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1766 1767 @classmethod 1768 def _test_waitfor_f(cls, cond, state): 1769 with cond: 1770 state.value = 0 1771 cond.notify() 1772 result = cond.wait_for(lambda : state.value==4) 1773 if not result or state.value != 4: 1774 sys.exit(1) 1775 1776 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1777 def test_waitfor(self): 1778 # based on test in test/lock_tests.py 1779 cond = self.Condition() 1780 state = self.Value('i', -1) 1781 1782 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1783 p.daemon = True 1784 p.start() 1785 1786 with cond: 1787 result = cond.wait_for(lambda : state.value==0) 1788 self.assertTrue(result) 1789 self.assertEqual(state.value, 0) 1790 1791 for i in range(4): 1792 time.sleep(0.01) 1793 with cond: 1794 state.value += 1 1795 cond.notify() 1796 1797 join_process(p) 1798 self.assertEqual(p.exitcode, 0) 1799 1800 @classmethod 1801 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1802 sem.release() 1803 with cond: 1804 expected = 0.100 1805 dt = time.monotonic() 1806 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1807 dt = time.monotonic() - dt 1808 if not result and (expected - CLOCK_RES) <= dt: 1809 success.value = True 1810 1811 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1812 def test_waitfor_timeout(self): 1813 # based on test in test/lock_tests.py 1814 cond = self.Condition() 1815 state = self.Value('i', 0) 1816 success = self.Value('i', False) 1817 sem = self.Semaphore(0) 1818 1819 p = self.Process(target=self._test_waitfor_timeout_f, 1820 args=(cond, state, success, sem)) 1821 p.daemon = True 1822 p.start() 1823 self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT)) 1824 1825 # Only increment 3 times, so state == 4 is never reached. 1826 for i in range(3): 1827 time.sleep(0.010) 1828 with cond: 1829 state.value += 1 1830 cond.notify() 1831 1832 join_process(p) 1833 self.assertTrue(success.value) 1834 1835 @classmethod 1836 def _test_wait_result(cls, c, pid): 1837 with c: 1838 c.notify() 1839 time.sleep(1) 1840 if pid is not None: 1841 os.kill(pid, signal.SIGINT) 1842 1843 def test_wait_result(self): 1844 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1845 pid = os.getpid() 1846 else: 1847 pid = None 1848 1849 c = self.Condition() 1850 with c: 1851 self.assertFalse(c.wait(0)) 1852 self.assertFalse(c.wait(0.1)) 1853 1854 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1855 p.start() 1856 1857 self.assertTrue(c.wait(60)) 1858 if pid is not None: 1859 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1860 1861 p.join() 1862 1863 1864class _TestEvent(BaseTestCase): 1865 1866 @classmethod 1867 def _test_event(cls, event): 1868 time.sleep(TIMEOUT2) 1869 event.set() 1870 1871 def test_event(self): 1872 event = self.Event() 1873 wait = TimingWrapper(event.wait) 1874 1875 # Removed temporarily, due to API shear, this does not 1876 # work with threading._Event objects. is_set == isSet 1877 self.assertEqual(event.is_set(), False) 1878 1879 # Removed, threading.Event.wait() will return the value of the __flag 1880 # instead of None. API Shear with the semaphore backed mp.Event 1881 self.assertEqual(wait(0.0), False) 1882 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1883 self.assertEqual(wait(TIMEOUT1), False) 1884 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1885 1886 event.set() 1887 1888 # See note above on the API differences 1889 self.assertEqual(event.is_set(), True) 1890 self.assertEqual(wait(), True) 1891 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1892 self.assertEqual(wait(TIMEOUT1), True) 1893 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1894 # self.assertEqual(event.is_set(), True) 1895 1896 event.clear() 1897 1898 #self.assertEqual(event.is_set(), False) 1899 1900 p = self.Process(target=self._test_event, args=(event,)) 1901 p.daemon = True 1902 p.start() 1903 self.assertEqual(wait(), True) 1904 p.join() 1905 1906 def test_repr(self) -> None: 1907 event = self.Event() 1908 if self.TYPE == 'processes': 1909 self.assertRegex(repr(event), r"<Event at .* unset>") 1910 event.set() 1911 self.assertRegex(repr(event), r"<Event at .* set>") 1912 event.clear() 1913 self.assertRegex(repr(event), r"<Event at .* unset>") 1914 elif self.TYPE == 'manager': 1915 self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*") 1916 event.set() 1917 self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*") 1918 1919 1920# Tests for Barrier - adapted from tests in test/lock_tests.py 1921# 1922 1923# Many of the tests for threading.Barrier use a list as an atomic 1924# counter: a value is appended to increment the counter, and the 1925# length of the list gives the value. We use the class DummyList 1926# for the same purpose. 1927 1928class _DummyList(object): 1929 1930 def __init__(self): 1931 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1932 lock = multiprocessing.Lock() 1933 self.__setstate__((wrapper, lock)) 1934 self._lengthbuf[0] = 0 1935 1936 def __setstate__(self, state): 1937 (self._wrapper, self._lock) = state 1938 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1939 1940 def __getstate__(self): 1941 return (self._wrapper, self._lock) 1942 1943 def append(self, _): 1944 with self._lock: 1945 self._lengthbuf[0] += 1 1946 1947 def __len__(self): 1948 with self._lock: 1949 return self._lengthbuf[0] 1950 1951def _wait(): 1952 # A crude wait/yield function not relying on synchronization primitives. 1953 time.sleep(0.01) 1954 1955 1956class Bunch(object): 1957 """ 1958 A bunch of threads. 1959 """ 1960 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1961 """ 1962 Construct a bunch of `n` threads running the same function `f`. 1963 If `wait_before_exit` is True, the threads won't terminate until 1964 do_finish() is called. 1965 """ 1966 self.f = f 1967 self.args = args 1968 self.n = n 1969 self.started = namespace.DummyList() 1970 self.finished = namespace.DummyList() 1971 self._can_exit = namespace.Event() 1972 if not wait_before_exit: 1973 self._can_exit.set() 1974 1975 threads = [] 1976 for i in range(n): 1977 p = namespace.Process(target=self.task) 1978 p.daemon = True 1979 p.start() 1980 threads.append(p) 1981 1982 def finalize(threads): 1983 for p in threads: 1984 p.join() 1985 1986 self._finalizer = weakref.finalize(self, finalize, threads) 1987 1988 def task(self): 1989 pid = os.getpid() 1990 self.started.append(pid) 1991 try: 1992 self.f(*self.args) 1993 finally: 1994 self.finished.append(pid) 1995 self._can_exit.wait(30) 1996 assert self._can_exit.is_set() 1997 1998 def wait_for_started(self): 1999 while len(self.started) < self.n: 2000 _wait() 2001 2002 def wait_for_finished(self): 2003 while len(self.finished) < self.n: 2004 _wait() 2005 2006 def do_finish(self): 2007 self._can_exit.set() 2008 2009 def close(self): 2010 self._finalizer() 2011 2012 2013class AppendTrue(object): 2014 def __init__(self, obj): 2015 self.obj = obj 2016 def __call__(self): 2017 self.obj.append(True) 2018 2019 2020class _TestBarrier(BaseTestCase): 2021 """ 2022 Tests for Barrier objects. 2023 """ 2024 N = 5 2025 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 2026 2027 def setUp(self): 2028 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 2029 2030 def tearDown(self): 2031 self.barrier.abort() 2032 self.barrier = None 2033 2034 def DummyList(self): 2035 if self.TYPE == 'threads': 2036 return [] 2037 elif self.TYPE == 'manager': 2038 return self.manager.list() 2039 else: 2040 return _DummyList() 2041 2042 def run_threads(self, f, args): 2043 b = Bunch(self, f, args, self.N-1) 2044 try: 2045 f(*args) 2046 b.wait_for_finished() 2047 finally: 2048 b.close() 2049 2050 @classmethod 2051 def multipass(cls, barrier, results, n): 2052 m = barrier.parties 2053 assert m == cls.N 2054 for i in range(n): 2055 results[0].append(True) 2056 assert len(results[1]) == i * m 2057 barrier.wait() 2058 results[1].append(True) 2059 assert len(results[0]) == (i + 1) * m 2060 barrier.wait() 2061 try: 2062 assert barrier.n_waiting == 0 2063 except NotImplementedError: 2064 pass 2065 assert not barrier.broken 2066 2067 def test_barrier(self, passes=1): 2068 """ 2069 Test that a barrier is passed in lockstep 2070 """ 2071 results = [self.DummyList(), self.DummyList()] 2072 self.run_threads(self.multipass, (self.barrier, results, passes)) 2073 2074 def test_barrier_10(self): 2075 """ 2076 Test that a barrier works for 10 consecutive runs 2077 """ 2078 return self.test_barrier(10) 2079 2080 @classmethod 2081 def _test_wait_return_f(cls, barrier, queue): 2082 res = barrier.wait() 2083 queue.put(res) 2084 2085 def test_wait_return(self): 2086 """ 2087 test the return value from barrier.wait 2088 """ 2089 queue = self.Queue() 2090 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 2091 results = [queue.get() for i in range(self.N)] 2092 self.assertEqual(results.count(0), 1) 2093 close_queue(queue) 2094 2095 @classmethod 2096 def _test_action_f(cls, barrier, results): 2097 barrier.wait() 2098 if len(results) != 1: 2099 raise RuntimeError 2100 2101 def test_action(self): 2102 """ 2103 Test the 'action' callback 2104 """ 2105 results = self.DummyList() 2106 barrier = self.Barrier(self.N, action=AppendTrue(results)) 2107 self.run_threads(self._test_action_f, (barrier, results)) 2108 self.assertEqual(len(results), 1) 2109 2110 @classmethod 2111 def _test_abort_f(cls, barrier, results1, results2): 2112 try: 2113 i = barrier.wait() 2114 if i == cls.N//2: 2115 raise RuntimeError 2116 barrier.wait() 2117 results1.append(True) 2118 except threading.BrokenBarrierError: 2119 results2.append(True) 2120 except RuntimeError: 2121 barrier.abort() 2122 2123 def test_abort(self): 2124 """ 2125 Test that an abort will put the barrier in a broken state 2126 """ 2127 results1 = self.DummyList() 2128 results2 = self.DummyList() 2129 self.run_threads(self._test_abort_f, 2130 (self.barrier, results1, results2)) 2131 self.assertEqual(len(results1), 0) 2132 self.assertEqual(len(results2), self.N-1) 2133 self.assertTrue(self.barrier.broken) 2134 2135 @classmethod 2136 def _test_reset_f(cls, barrier, results1, results2, results3): 2137 i = barrier.wait() 2138 if i == cls.N//2: 2139 # Wait until the other threads are all in the barrier. 2140 while barrier.n_waiting < cls.N-1: 2141 time.sleep(0.001) 2142 barrier.reset() 2143 else: 2144 try: 2145 barrier.wait() 2146 results1.append(True) 2147 except threading.BrokenBarrierError: 2148 results2.append(True) 2149 # Now, pass the barrier again 2150 barrier.wait() 2151 results3.append(True) 2152 2153 def test_reset(self): 2154 """ 2155 Test that a 'reset' on a barrier frees the waiting threads 2156 """ 2157 results1 = self.DummyList() 2158 results2 = self.DummyList() 2159 results3 = self.DummyList() 2160 self.run_threads(self._test_reset_f, 2161 (self.barrier, results1, results2, results3)) 2162 self.assertEqual(len(results1), 0) 2163 self.assertEqual(len(results2), self.N-1) 2164 self.assertEqual(len(results3), self.N) 2165 2166 @classmethod 2167 def _test_abort_and_reset_f(cls, barrier, barrier2, 2168 results1, results2, results3): 2169 try: 2170 i = barrier.wait() 2171 if i == cls.N//2: 2172 raise RuntimeError 2173 barrier.wait() 2174 results1.append(True) 2175 except threading.BrokenBarrierError: 2176 results2.append(True) 2177 except RuntimeError: 2178 barrier.abort() 2179 # Synchronize and reset the barrier. Must synchronize first so 2180 # that everyone has left it when we reset, and after so that no 2181 # one enters it before the reset. 2182 if barrier2.wait() == cls.N//2: 2183 barrier.reset() 2184 barrier2.wait() 2185 barrier.wait() 2186 results3.append(True) 2187 2188 def test_abort_and_reset(self): 2189 """ 2190 Test that a barrier can be reset after being broken. 2191 """ 2192 results1 = self.DummyList() 2193 results2 = self.DummyList() 2194 results3 = self.DummyList() 2195 barrier2 = self.Barrier(self.N) 2196 2197 self.run_threads(self._test_abort_and_reset_f, 2198 (self.barrier, barrier2, results1, results2, results3)) 2199 self.assertEqual(len(results1), 0) 2200 self.assertEqual(len(results2), self.N-1) 2201 self.assertEqual(len(results3), self.N) 2202 2203 @classmethod 2204 def _test_timeout_f(cls, barrier, results): 2205 i = barrier.wait() 2206 if i == cls.N//2: 2207 # One thread is late! 2208 time.sleep(1.0) 2209 try: 2210 barrier.wait(0.5) 2211 except threading.BrokenBarrierError: 2212 results.append(True) 2213 2214 def test_timeout(self): 2215 """ 2216 Test wait(timeout) 2217 """ 2218 results = self.DummyList() 2219 self.run_threads(self._test_timeout_f, (self.barrier, results)) 2220 self.assertEqual(len(results), self.barrier.parties) 2221 2222 @classmethod 2223 def _test_default_timeout_f(cls, barrier, results): 2224 i = barrier.wait(cls.defaultTimeout) 2225 if i == cls.N//2: 2226 # One thread is later than the default timeout 2227 time.sleep(1.0) 2228 try: 2229 barrier.wait() 2230 except threading.BrokenBarrierError: 2231 results.append(True) 2232 2233 def test_default_timeout(self): 2234 """ 2235 Test the barrier's default timeout 2236 """ 2237 barrier = self.Barrier(self.N, timeout=0.5) 2238 results = self.DummyList() 2239 self.run_threads(self._test_default_timeout_f, (barrier, results)) 2240 self.assertEqual(len(results), barrier.parties) 2241 2242 def test_single_thread(self): 2243 b = self.Barrier(1) 2244 b.wait() 2245 b.wait() 2246 2247 @classmethod 2248 def _test_thousand_f(cls, barrier, passes, conn, lock): 2249 for i in range(passes): 2250 barrier.wait() 2251 with lock: 2252 conn.send(i) 2253 2254 def test_thousand(self): 2255 if self.TYPE == 'manager': 2256 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2257 passes = 1000 2258 lock = self.Lock() 2259 conn, child_conn = self.Pipe(False) 2260 for j in range(self.N): 2261 p = self.Process(target=self._test_thousand_f, 2262 args=(self.barrier, passes, child_conn, lock)) 2263 p.start() 2264 self.addCleanup(p.join) 2265 2266 for i in range(passes): 2267 for j in range(self.N): 2268 self.assertEqual(conn.recv(), i) 2269 2270# 2271# 2272# 2273 2274class _TestValue(BaseTestCase): 2275 2276 ALLOWED_TYPES = ('processes',) 2277 2278 codes_values = [ 2279 ('i', 4343, 24234), 2280 ('d', 3.625, -4.25), 2281 ('h', -232, 234), 2282 ('q', 2 ** 33, 2 ** 34), 2283 ('c', latin('x'), latin('y')) 2284 ] 2285 2286 def setUp(self): 2287 if not HAS_SHAREDCTYPES: 2288 self.skipTest("requires multiprocessing.sharedctypes") 2289 2290 @classmethod 2291 def _test(cls, values): 2292 for sv, cv in zip(values, cls.codes_values): 2293 sv.value = cv[2] 2294 2295 2296 def test_value(self, raw=False): 2297 if raw: 2298 values = [self.RawValue(code, value) 2299 for code, value, _ in self.codes_values] 2300 else: 2301 values = [self.Value(code, value) 2302 for code, value, _ in self.codes_values] 2303 2304 for sv, cv in zip(values, self.codes_values): 2305 self.assertEqual(sv.value, cv[1]) 2306 2307 proc = self.Process(target=self._test, args=(values,)) 2308 proc.daemon = True 2309 proc.start() 2310 proc.join() 2311 2312 for sv, cv in zip(values, self.codes_values): 2313 self.assertEqual(sv.value, cv[2]) 2314 2315 def test_rawvalue(self): 2316 self.test_value(raw=True) 2317 2318 def test_getobj_getlock(self): 2319 val1 = self.Value('i', 5) 2320 lock1 = val1.get_lock() 2321 obj1 = val1.get_obj() 2322 2323 val2 = self.Value('i', 5, lock=None) 2324 lock2 = val2.get_lock() 2325 obj2 = val2.get_obj() 2326 2327 lock = self.Lock() 2328 val3 = self.Value('i', 5, lock=lock) 2329 lock3 = val3.get_lock() 2330 obj3 = val3.get_obj() 2331 self.assertEqual(lock, lock3) 2332 2333 arr4 = self.Value('i', 5, lock=False) 2334 self.assertFalse(hasattr(arr4, 'get_lock')) 2335 self.assertFalse(hasattr(arr4, 'get_obj')) 2336 2337 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2338 2339 arr5 = self.RawValue('i', 5) 2340 self.assertFalse(hasattr(arr5, 'get_lock')) 2341 self.assertFalse(hasattr(arr5, 'get_obj')) 2342 2343 2344class _TestArray(BaseTestCase): 2345 2346 ALLOWED_TYPES = ('processes',) 2347 2348 @classmethod 2349 def f(cls, seq): 2350 for i in range(1, len(seq)): 2351 seq[i] += seq[i-1] 2352 2353 @unittest.skipIf(c_int is None, "requires _ctypes") 2354 def test_array(self, raw=False): 2355 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2356 if raw: 2357 arr = self.RawArray('i', seq) 2358 else: 2359 arr = self.Array('i', seq) 2360 2361 self.assertEqual(len(arr), len(seq)) 2362 self.assertEqual(arr[3], seq[3]) 2363 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2364 2365 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2366 2367 self.assertEqual(list(arr[:]), seq) 2368 2369 self.f(seq) 2370 2371 p = self.Process(target=self.f, args=(arr,)) 2372 p.daemon = True 2373 p.start() 2374 p.join() 2375 2376 self.assertEqual(list(arr[:]), seq) 2377 2378 @unittest.skipIf(c_int is None, "requires _ctypes") 2379 def test_array_from_size(self): 2380 size = 10 2381 # Test for zeroing (see issue #11675). 2382 # The repetition below strengthens the test by increasing the chances 2383 # of previously allocated non-zero memory being used for the new array 2384 # on the 2nd and 3rd loops. 2385 for _ in range(3): 2386 arr = self.Array('i', size) 2387 self.assertEqual(len(arr), size) 2388 self.assertEqual(list(arr), [0] * size) 2389 arr[:] = range(10) 2390 self.assertEqual(list(arr), list(range(10))) 2391 del arr 2392 2393 @unittest.skipIf(c_int is None, "requires _ctypes") 2394 def test_rawarray(self): 2395 self.test_array(raw=True) 2396 2397 @unittest.skipIf(c_int is None, "requires _ctypes") 2398 def test_getobj_getlock_obj(self): 2399 arr1 = self.Array('i', list(range(10))) 2400 lock1 = arr1.get_lock() 2401 obj1 = arr1.get_obj() 2402 2403 arr2 = self.Array('i', list(range(10)), lock=None) 2404 lock2 = arr2.get_lock() 2405 obj2 = arr2.get_obj() 2406 2407 lock = self.Lock() 2408 arr3 = self.Array('i', list(range(10)), lock=lock) 2409 lock3 = arr3.get_lock() 2410 obj3 = arr3.get_obj() 2411 self.assertEqual(lock, lock3) 2412 2413 arr4 = self.Array('i', range(10), lock=False) 2414 self.assertFalse(hasattr(arr4, 'get_lock')) 2415 self.assertFalse(hasattr(arr4, 'get_obj')) 2416 self.assertRaises(AttributeError, 2417 self.Array, 'i', range(10), lock='notalock') 2418 2419 arr5 = self.RawArray('i', range(10)) 2420 self.assertFalse(hasattr(arr5, 'get_lock')) 2421 self.assertFalse(hasattr(arr5, 'get_obj')) 2422 2423# 2424# 2425# 2426 2427class _TestContainers(BaseTestCase): 2428 2429 ALLOWED_TYPES = ('manager',) 2430 2431 def test_list(self): 2432 a = self.list(list(range(10))) 2433 self.assertEqual(a[:], list(range(10))) 2434 2435 b = self.list() 2436 self.assertEqual(b[:], []) 2437 2438 b.extend(list(range(5))) 2439 self.assertEqual(b[:], list(range(5))) 2440 2441 self.assertEqual(b[2], 2) 2442 self.assertEqual(b[2:10], [2,3,4]) 2443 2444 b *= 2 2445 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2446 2447 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2448 2449 self.assertEqual(a[:], list(range(10))) 2450 2451 d = [a, b] 2452 e = self.list(d) 2453 self.assertEqual( 2454 [element[:] for element in e], 2455 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2456 ) 2457 2458 f = self.list([a]) 2459 a.append('hello') 2460 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2461 2462 def test_list_iter(self): 2463 a = self.list(list(range(10))) 2464 it = iter(a) 2465 self.assertEqual(list(it), list(range(10))) 2466 self.assertEqual(list(it), []) # exhausted 2467 # list modified during iteration 2468 it = iter(a) 2469 a[0] = 100 2470 self.assertEqual(next(it), 100) 2471 2472 def test_list_proxy_in_list(self): 2473 a = self.list([self.list(range(3)) for _i in range(3)]) 2474 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2475 2476 a[0][-1] = 55 2477 self.assertEqual(a[0][:], [0, 1, 55]) 2478 for i in range(1, 3): 2479 self.assertEqual(a[i][:], [0, 1, 2]) 2480 2481 self.assertEqual(a[1].pop(), 2) 2482 self.assertEqual(len(a[1]), 2) 2483 for i in range(0, 3, 2): 2484 self.assertEqual(len(a[i]), 3) 2485 2486 del a 2487 2488 b = self.list() 2489 b.append(b) 2490 del b 2491 2492 def test_dict(self): 2493 d = self.dict() 2494 indices = list(range(65, 70)) 2495 for i in indices: 2496 d[i] = chr(i) 2497 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2498 self.assertEqual(sorted(d.keys()), indices) 2499 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2500 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2501 2502 def test_dict_iter(self): 2503 d = self.dict() 2504 indices = list(range(65, 70)) 2505 for i in indices: 2506 d[i] = chr(i) 2507 it = iter(d) 2508 self.assertEqual(list(it), indices) 2509 self.assertEqual(list(it), []) # exhausted 2510 # dictionary changed size during iteration 2511 it = iter(d) 2512 d.clear() 2513 self.assertRaises(RuntimeError, next, it) 2514 2515 def test_dict_proxy_nested(self): 2516 pets = self.dict(ferrets=2, hamsters=4) 2517 supplies = self.dict(water=10, feed=3) 2518 d = self.dict(pets=pets, supplies=supplies) 2519 2520 self.assertEqual(supplies['water'], 10) 2521 self.assertEqual(d['supplies']['water'], 10) 2522 2523 d['supplies']['blankets'] = 5 2524 self.assertEqual(supplies['blankets'], 5) 2525 self.assertEqual(d['supplies']['blankets'], 5) 2526 2527 d['supplies']['water'] = 7 2528 self.assertEqual(supplies['water'], 7) 2529 self.assertEqual(d['supplies']['water'], 7) 2530 2531 del pets 2532 del supplies 2533 self.assertEqual(d['pets']['ferrets'], 2) 2534 d['supplies']['blankets'] = 11 2535 self.assertEqual(d['supplies']['blankets'], 11) 2536 2537 pets = d['pets'] 2538 supplies = d['supplies'] 2539 supplies['water'] = 7 2540 self.assertEqual(supplies['water'], 7) 2541 self.assertEqual(d['supplies']['water'], 7) 2542 2543 d.clear() 2544 self.assertEqual(len(d), 0) 2545 self.assertEqual(supplies['water'], 7) 2546 self.assertEqual(pets['hamsters'], 4) 2547 2548 l = self.list([pets, supplies]) 2549 l[0]['marmots'] = 1 2550 self.assertEqual(pets['marmots'], 1) 2551 self.assertEqual(l[0]['marmots'], 1) 2552 2553 del pets 2554 del supplies 2555 self.assertEqual(l[0]['marmots'], 1) 2556 2557 outer = self.list([[88, 99], l]) 2558 self.assertIsInstance(outer[0], list) # Not a ListProxy 2559 self.assertEqual(outer[-1][-1]['feed'], 3) 2560 2561 def test_nested_queue(self): 2562 a = self.list() # Test queue inside list 2563 a.append(self.Queue()) 2564 a[0].put(123) 2565 self.assertEqual(a[0].get(), 123) 2566 b = self.dict() # Test queue inside dict 2567 b[0] = self.Queue() 2568 b[0].put(456) 2569 self.assertEqual(b[0].get(), 456) 2570 2571 def test_namespace(self): 2572 n = self.Namespace() 2573 n.name = 'Bob' 2574 n.job = 'Builder' 2575 n._hidden = 'hidden' 2576 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2577 del n.job 2578 self.assertEqual(str(n), "Namespace(name='Bob')") 2579 self.assertTrue(hasattr(n, 'name')) 2580 self.assertTrue(not hasattr(n, 'job')) 2581 2582# 2583# 2584# 2585 2586def sqr(x, wait=0.0, event=None): 2587 if event is None: 2588 time.sleep(wait) 2589 else: 2590 event.wait(wait) 2591 return x*x 2592 2593def mul(x, y): 2594 return x*y 2595 2596def raise_large_valuerror(wait): 2597 time.sleep(wait) 2598 raise ValueError("x" * 1024**2) 2599 2600def identity(x): 2601 return x 2602 2603class CountedObject(object): 2604 n_instances = 0 2605 2606 def __new__(cls): 2607 cls.n_instances += 1 2608 return object.__new__(cls) 2609 2610 def __del__(self): 2611 type(self).n_instances -= 1 2612 2613class SayWhenError(ValueError): pass 2614 2615def exception_throwing_generator(total, when): 2616 if when == -1: 2617 raise SayWhenError("Somebody said when") 2618 for i in range(total): 2619 if i == when: 2620 raise SayWhenError("Somebody said when") 2621 yield i 2622 2623 2624class _TestPool(BaseTestCase): 2625 2626 @classmethod 2627 def setUpClass(cls): 2628 super().setUpClass() 2629 cls.pool = cls.Pool(4) 2630 2631 @classmethod 2632 def tearDownClass(cls): 2633 cls.pool.terminate() 2634 cls.pool.join() 2635 cls.pool = None 2636 super().tearDownClass() 2637 2638 def test_apply(self): 2639 papply = self.pool.apply 2640 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2641 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2642 2643 def test_map(self): 2644 pmap = self.pool.map 2645 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2646 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2647 list(map(sqr, list(range(100))))) 2648 2649 def test_starmap(self): 2650 psmap = self.pool.starmap 2651 tuples = list(zip(range(10), range(9,-1, -1))) 2652 self.assertEqual(psmap(mul, tuples), 2653 list(itertools.starmap(mul, tuples))) 2654 tuples = list(zip(range(100), range(99,-1, -1))) 2655 self.assertEqual(psmap(mul, tuples, chunksize=20), 2656 list(itertools.starmap(mul, tuples))) 2657 2658 def test_starmap_async(self): 2659 tuples = list(zip(range(100), range(99,-1, -1))) 2660 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2661 list(itertools.starmap(mul, tuples))) 2662 2663 def test_map_async(self): 2664 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2665 list(map(sqr, list(range(10))))) 2666 2667 def test_map_async_callbacks(self): 2668 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2669 self.pool.map_async(int, ['1'], 2670 callback=call_args.append, 2671 error_callback=call_args.append).wait() 2672 self.assertEqual(1, len(call_args)) 2673 self.assertEqual([1], call_args[0]) 2674 self.pool.map_async(int, ['a'], 2675 callback=call_args.append, 2676 error_callback=call_args.append).wait() 2677 self.assertEqual(2, len(call_args)) 2678 self.assertIsInstance(call_args[1], ValueError) 2679 2680 def test_map_unplicklable(self): 2681 # Issue #19425 -- failure to pickle should not cause a hang 2682 if self.TYPE == 'threads': 2683 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2684 class A(object): 2685 def __reduce__(self): 2686 raise RuntimeError('cannot pickle') 2687 with self.assertRaises(RuntimeError): 2688 self.pool.map(sqr, [A()]*10) 2689 2690 def test_map_chunksize(self): 2691 try: 2692 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2693 except multiprocessing.TimeoutError: 2694 self.fail("pool.map_async with chunksize stalled on null list") 2695 2696 def test_map_handle_iterable_exception(self): 2697 if self.TYPE == 'manager': 2698 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2699 2700 # SayWhenError seen at the very first of the iterable 2701 with self.assertRaises(SayWhenError): 2702 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2703 # again, make sure it's reentrant 2704 with self.assertRaises(SayWhenError): 2705 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2706 2707 with self.assertRaises(SayWhenError): 2708 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2709 2710 class SpecialIterable: 2711 def __iter__(self): 2712 return self 2713 def __next__(self): 2714 raise SayWhenError 2715 def __len__(self): 2716 return 1 2717 with self.assertRaises(SayWhenError): 2718 self.pool.map(sqr, SpecialIterable(), 1) 2719 with self.assertRaises(SayWhenError): 2720 self.pool.map(sqr, SpecialIterable(), 1) 2721 2722 def test_async(self): 2723 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2724 get = TimingWrapper(res.get) 2725 self.assertEqual(get(), 49) 2726 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2727 2728 def test_async_timeout(self): 2729 p = self.Pool(3) 2730 try: 2731 event = threading.Event() if self.TYPE == 'threads' else None 2732 res = p.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT, event)) 2733 get = TimingWrapper(res.get) 2734 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2735 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2736 finally: 2737 if event is not None: 2738 event.set() 2739 p.terminate() 2740 p.join() 2741 2742 def test_imap(self): 2743 it = self.pool.imap(sqr, list(range(10))) 2744 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2745 2746 it = self.pool.imap(sqr, list(range(10))) 2747 for i in range(10): 2748 self.assertEqual(next(it), i*i) 2749 self.assertRaises(StopIteration, it.__next__) 2750 2751 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2752 for i in range(1000): 2753 self.assertEqual(next(it), i*i) 2754 self.assertRaises(StopIteration, it.__next__) 2755 2756 def test_imap_handle_iterable_exception(self): 2757 if self.TYPE == 'manager': 2758 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2759 2760 # SayWhenError seen at the very first of the iterable 2761 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2762 self.assertRaises(SayWhenError, it.__next__) 2763 # again, make sure it's reentrant 2764 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2765 self.assertRaises(SayWhenError, it.__next__) 2766 2767 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2768 for i in range(3): 2769 self.assertEqual(next(it), i*i) 2770 self.assertRaises(SayWhenError, it.__next__) 2771 2772 # SayWhenError seen at start of problematic chunk's results 2773 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2774 for i in range(6): 2775 self.assertEqual(next(it), i*i) 2776 self.assertRaises(SayWhenError, it.__next__) 2777 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2778 for i in range(4): 2779 self.assertEqual(next(it), i*i) 2780 self.assertRaises(SayWhenError, it.__next__) 2781 2782 def test_imap_unordered(self): 2783 it = self.pool.imap_unordered(sqr, list(range(10))) 2784 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2785 2786 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2787 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2788 2789 def test_imap_unordered_handle_iterable_exception(self): 2790 if self.TYPE == 'manager': 2791 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2792 2793 # SayWhenError seen at the very first of the iterable 2794 it = self.pool.imap_unordered(sqr, 2795 exception_throwing_generator(1, -1), 2796 1) 2797 self.assertRaises(SayWhenError, it.__next__) 2798 # again, make sure it's reentrant 2799 it = self.pool.imap_unordered(sqr, 2800 exception_throwing_generator(1, -1), 2801 1) 2802 self.assertRaises(SayWhenError, it.__next__) 2803 2804 it = self.pool.imap_unordered(sqr, 2805 exception_throwing_generator(10, 3), 2806 1) 2807 expected_values = list(map(sqr, list(range(10)))) 2808 with self.assertRaises(SayWhenError): 2809 # imap_unordered makes it difficult to anticipate the SayWhenError 2810 for i in range(10): 2811 value = next(it) 2812 self.assertIn(value, expected_values) 2813 expected_values.remove(value) 2814 2815 it = self.pool.imap_unordered(sqr, 2816 exception_throwing_generator(20, 7), 2817 2) 2818 expected_values = list(map(sqr, list(range(20)))) 2819 with self.assertRaises(SayWhenError): 2820 for i in range(20): 2821 value = next(it) 2822 self.assertIn(value, expected_values) 2823 expected_values.remove(value) 2824 2825 def test_make_pool(self): 2826 expected_error = (RemoteError if self.TYPE == 'manager' 2827 else ValueError) 2828 2829 self.assertRaises(expected_error, self.Pool, -1) 2830 self.assertRaises(expected_error, self.Pool, 0) 2831 2832 if self.TYPE != 'manager': 2833 p = self.Pool(3) 2834 try: 2835 self.assertEqual(3, len(p._pool)) 2836 finally: 2837 p.close() 2838 p.join() 2839 2840 def test_terminate(self): 2841 # Simulate slow tasks which take "forever" to complete 2842 sleep_time = support.LONG_TIMEOUT 2843 2844 if self.TYPE == 'threads': 2845 # Thread pool workers can't be forced to quit, so if the first 2846 # task starts early enough, we will end up waiting for it. 2847 # Sleep for a shorter time, so the test doesn't block. 2848 sleep_time = 1 2849 2850 p = self.Pool(3) 2851 args = [sleep_time for i in range(10_000)] 2852 result = p.map_async(time.sleep, args, chunksize=1) 2853 time.sleep(0.2) # give some tasks a chance to start 2854 p.terminate() 2855 p.join() 2856 2857 def test_empty_iterable(self): 2858 # See Issue 12157 2859 p = self.Pool(1) 2860 2861 self.assertEqual(p.map(sqr, []), []) 2862 self.assertEqual(list(p.imap(sqr, [])), []) 2863 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2864 self.assertEqual(p.map_async(sqr, []).get(), []) 2865 2866 p.close() 2867 p.join() 2868 2869 def test_context(self): 2870 if self.TYPE == 'processes': 2871 L = list(range(10)) 2872 expected = [sqr(i) for i in L] 2873 with self.Pool(2) as p: 2874 r = p.map_async(sqr, L) 2875 self.assertEqual(r.get(), expected) 2876 p.join() 2877 self.assertRaises(ValueError, p.map_async, sqr, L) 2878 2879 @classmethod 2880 def _test_traceback(cls): 2881 raise RuntimeError(123) # some comment 2882 2883 def test_traceback(self): 2884 # We want ensure that the traceback from the child process is 2885 # contained in the traceback raised in the main process. 2886 if self.TYPE == 'processes': 2887 with self.Pool(1) as p: 2888 try: 2889 p.apply(self._test_traceback) 2890 except Exception as e: 2891 exc = e 2892 else: 2893 self.fail('expected RuntimeError') 2894 p.join() 2895 self.assertIs(type(exc), RuntimeError) 2896 self.assertEqual(exc.args, (123,)) 2897 cause = exc.__cause__ 2898 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2899 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2900 2901 with test.support.captured_stderr() as f1: 2902 try: 2903 raise exc 2904 except RuntimeError: 2905 sys.excepthook(*sys.exc_info()) 2906 self.assertIn('raise RuntimeError(123) # some comment', 2907 f1.getvalue()) 2908 # _helper_reraises_exception should not make the error 2909 # a remote exception 2910 with self.Pool(1) as p: 2911 try: 2912 p.map(sqr, exception_throwing_generator(1, -1), 1) 2913 except Exception as e: 2914 exc = e 2915 else: 2916 self.fail('expected SayWhenError') 2917 self.assertIs(type(exc), SayWhenError) 2918 self.assertIs(exc.__cause__, None) 2919 p.join() 2920 2921 @classmethod 2922 def _test_wrapped_exception(cls): 2923 raise RuntimeError('foo') 2924 2925 def test_wrapped_exception(self): 2926 # Issue #20980: Should not wrap exception when using thread pool 2927 with self.Pool(1) as p: 2928 with self.assertRaises(RuntimeError): 2929 p.apply(self._test_wrapped_exception) 2930 p.join() 2931 2932 def test_map_no_failfast(self): 2933 # Issue #23992: the fail-fast behaviour when an exception is raised 2934 # during map() would make Pool.join() deadlock, because a worker 2935 # process would fill the result queue (after the result handler thread 2936 # terminated, hence not draining it anymore). 2937 2938 t_start = time.monotonic() 2939 2940 with self.assertRaises(ValueError): 2941 with self.Pool(2) as p: 2942 try: 2943 p.map(raise_large_valuerror, [0, 1]) 2944 finally: 2945 time.sleep(0.5) 2946 p.close() 2947 p.join() 2948 2949 # check that we indeed waited for all jobs 2950 self.assertGreater(time.monotonic() - t_start, 0.9) 2951 2952 def test_release_task_refs(self): 2953 # Issue #29861: task arguments and results should not be kept 2954 # alive after we are done with them. 2955 objs = [CountedObject() for i in range(10)] 2956 refs = [weakref.ref(o) for o in objs] 2957 self.pool.map(identity, objs) 2958 2959 del objs 2960 time.sleep(DELTA) # let threaded cleanup code run 2961 support.gc_collect() # For PyPy or other GCs. 2962 self.assertEqual(set(wr() for wr in refs), {None}) 2963 # With a process pool, copies of the objects are returned, check 2964 # they were released too. 2965 self.assertEqual(CountedObject.n_instances, 0) 2966 2967 def test_enter(self): 2968 if self.TYPE == 'manager': 2969 self.skipTest("test not applicable to manager") 2970 2971 pool = self.Pool(1) 2972 with pool: 2973 pass 2974 # call pool.terminate() 2975 # pool is no longer running 2976 2977 with self.assertRaises(ValueError): 2978 # bpo-35477: pool.__enter__() fails if the pool is not running 2979 with pool: 2980 pass 2981 pool.join() 2982 2983 def test_resource_warning(self): 2984 if self.TYPE == 'manager': 2985 self.skipTest("test not applicable to manager") 2986 2987 pool = self.Pool(1) 2988 pool.terminate() 2989 pool.join() 2990 2991 # force state to RUN to emit ResourceWarning in __del__() 2992 pool._state = multiprocessing.pool.RUN 2993 2994 with warnings_helper.check_warnings( 2995 ('unclosed running multiprocessing pool', ResourceWarning)): 2996 pool = None 2997 support.gc_collect() 2998 2999def raising(): 3000 raise KeyError("key") 3001 3002def unpickleable_result(): 3003 return lambda: 42 3004 3005class _TestPoolWorkerErrors(BaseTestCase): 3006 ALLOWED_TYPES = ('processes', ) 3007 3008 def test_async_error_callback(self): 3009 p = multiprocessing.Pool(2) 3010 3011 scratchpad = [None] 3012 def errback(exc): 3013 scratchpad[0] = exc 3014 3015 res = p.apply_async(raising, error_callback=errback) 3016 self.assertRaises(KeyError, res.get) 3017 self.assertTrue(scratchpad[0]) 3018 self.assertIsInstance(scratchpad[0], KeyError) 3019 3020 p.close() 3021 p.join() 3022 3023 def test_unpickleable_result(self): 3024 from multiprocessing.pool import MaybeEncodingError 3025 p = multiprocessing.Pool(2) 3026 3027 # Make sure we don't lose pool processes because of encoding errors. 3028 for iteration in range(20): 3029 3030 scratchpad = [None] 3031 def errback(exc): 3032 scratchpad[0] = exc 3033 3034 res = p.apply_async(unpickleable_result, error_callback=errback) 3035 self.assertRaises(MaybeEncodingError, res.get) 3036 wrapped = scratchpad[0] 3037 self.assertTrue(wrapped) 3038 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 3039 self.assertIsNotNone(wrapped.exc) 3040 self.assertIsNotNone(wrapped.value) 3041 3042 p.close() 3043 p.join() 3044 3045class _TestPoolWorkerLifetime(BaseTestCase): 3046 ALLOWED_TYPES = ('processes', ) 3047 3048 def test_pool_worker_lifetime(self): 3049 p = multiprocessing.Pool(3, maxtasksperchild=10) 3050 self.assertEqual(3, len(p._pool)) 3051 origworkerpids = [w.pid for w in p._pool] 3052 # Run many tasks so each worker gets replaced (hopefully) 3053 results = [] 3054 for i in range(100): 3055 results.append(p.apply_async(sqr, (i, ))) 3056 # Fetch the results and verify we got the right answers, 3057 # also ensuring all the tasks have completed. 3058 for (j, res) in enumerate(results): 3059 self.assertEqual(res.get(), sqr(j)) 3060 # Refill the pool 3061 p._repopulate_pool() 3062 # Wait until all workers are alive 3063 # (countdown * DELTA = 5 seconds max startup process time) 3064 countdown = 50 3065 while countdown and not all(w.is_alive() for w in p._pool): 3066 countdown -= 1 3067 time.sleep(DELTA) 3068 finalworkerpids = [w.pid for w in p._pool] 3069 # All pids should be assigned. See issue #7805. 3070 self.assertNotIn(None, origworkerpids) 3071 self.assertNotIn(None, finalworkerpids) 3072 # Finally, check that the worker pids have changed 3073 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 3074 p.close() 3075 p.join() 3076 3077 def test_pool_worker_lifetime_early_close(self): 3078 # Issue #10332: closing a pool whose workers have limited lifetimes 3079 # before all the tasks completed would make join() hang. 3080 p = multiprocessing.Pool(3, maxtasksperchild=1) 3081 results = [] 3082 for i in range(6): 3083 results.append(p.apply_async(sqr, (i, 0.3))) 3084 p.close() 3085 p.join() 3086 # check the results 3087 for (j, res) in enumerate(results): 3088 self.assertEqual(res.get(), sqr(j)) 3089 3090 def test_pool_maxtasksperchild_invalid(self): 3091 for value in [0, -1, 0.5, "12"]: 3092 with self.assertRaises(ValueError): 3093 multiprocessing.Pool(3, maxtasksperchild=value) 3094 3095 def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): 3096 # tests cases against bpo-38744 and bpo-39360 3097 cmd = '''if 1: 3098 from multiprocessing import Pool 3099 problem = None 3100 class A: 3101 def __init__(self): 3102 self.pool = Pool(processes=1) 3103 def test(): 3104 global problem 3105 problem = A() 3106 problem.pool.map(float, tuple(range(10))) 3107 if __name__ == "__main__": 3108 test() 3109 ''' 3110 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 3111 self.assertEqual(rc, 0) 3112 3113# 3114# Test of creating a customized manager class 3115# 3116 3117from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 3118 3119class FooBar(object): 3120 def f(self): 3121 return 'f()' 3122 def g(self): 3123 raise ValueError 3124 def _h(self): 3125 return '_h()' 3126 3127def baz(): 3128 for i in range(10): 3129 yield i*i 3130 3131class IteratorProxy(BaseProxy): 3132 _exposed_ = ('__next__',) 3133 def __iter__(self): 3134 return self 3135 def __next__(self): 3136 return self._callmethod('__next__') 3137 3138class MyManager(BaseManager): 3139 pass 3140 3141MyManager.register('Foo', callable=FooBar) 3142MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 3143MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 3144 3145 3146class _TestMyManager(BaseTestCase): 3147 3148 ALLOWED_TYPES = ('manager',) 3149 3150 def test_mymanager(self): 3151 manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) 3152 manager.start() 3153 self.common(manager) 3154 manager.shutdown() 3155 3156 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 3157 # to the manager process if it takes longer than 1 second to stop, 3158 # which happens on slow buildbots. 3159 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 3160 3161 def test_mymanager_context(self): 3162 manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) 3163 with manager: 3164 self.common(manager) 3165 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 3166 # to the manager process if it takes longer than 1 second to stop, 3167 # which happens on slow buildbots. 3168 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 3169 3170 def test_mymanager_context_prestarted(self): 3171 manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) 3172 manager.start() 3173 with manager: 3174 self.common(manager) 3175 self.assertEqual(manager._process.exitcode, 0) 3176 3177 def common(self, manager): 3178 foo = manager.Foo() 3179 bar = manager.Bar() 3180 baz = manager.baz() 3181 3182 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 3183 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 3184 3185 self.assertEqual(foo_methods, ['f', 'g']) 3186 self.assertEqual(bar_methods, ['f', '_h']) 3187 3188 self.assertEqual(foo.f(), 'f()') 3189 self.assertRaises(ValueError, foo.g) 3190 self.assertEqual(foo._callmethod('f'), 'f()') 3191 self.assertRaises(RemoteError, foo._callmethod, '_h') 3192 3193 self.assertEqual(bar.f(), 'f()') 3194 self.assertEqual(bar._h(), '_h()') 3195 self.assertEqual(bar._callmethod('f'), 'f()') 3196 self.assertEqual(bar._callmethod('_h'), '_h()') 3197 3198 self.assertEqual(list(baz), [i*i for i in range(10)]) 3199 3200 3201# 3202# Test of connecting to a remote server and using xmlrpclib for serialization 3203# 3204 3205_queue = pyqueue.Queue() 3206def get_queue(): 3207 return _queue 3208 3209class QueueManager(BaseManager): 3210 '''manager class used by server process''' 3211QueueManager.register('get_queue', callable=get_queue) 3212 3213class QueueManager2(BaseManager): 3214 '''manager class which specifies the same interface as QueueManager''' 3215QueueManager2.register('get_queue') 3216 3217 3218SERIALIZER = 'xmlrpclib' 3219 3220class _TestRemoteManager(BaseTestCase): 3221 3222 ALLOWED_TYPES = ('manager',) 3223 values = ['hello world', None, True, 2.25, 3224 'hall\xe5 v\xe4rlden', 3225 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 3226 b'hall\xe5 v\xe4rlden', 3227 ] 3228 result = values[:] 3229 3230 @classmethod 3231 def _putter(cls, address, authkey): 3232 manager = QueueManager2( 3233 address=address, authkey=authkey, serializer=SERIALIZER, 3234 shutdown_timeout=SHUTDOWN_TIMEOUT) 3235 manager.connect() 3236 queue = manager.get_queue() 3237 # Note that xmlrpclib will deserialize object as a list not a tuple 3238 queue.put(tuple(cls.values)) 3239 3240 def test_remote(self): 3241 authkey = os.urandom(32) 3242 3243 manager = QueueManager( 3244 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER, 3245 shutdown_timeout=SHUTDOWN_TIMEOUT) 3246 manager.start() 3247 self.addCleanup(manager.shutdown) 3248 3249 p = self.Process(target=self._putter, args=(manager.address, authkey)) 3250 p.daemon = True 3251 p.start() 3252 3253 manager2 = QueueManager2( 3254 address=manager.address, authkey=authkey, serializer=SERIALIZER, 3255 shutdown_timeout=SHUTDOWN_TIMEOUT) 3256 manager2.connect() 3257 queue = manager2.get_queue() 3258 3259 self.assertEqual(queue.get(), self.result) 3260 3261 # Because we are using xmlrpclib for serialization instead of 3262 # pickle this will cause a serialization error. 3263 self.assertRaises(Exception, queue.put, time.sleep) 3264 3265 # Make queue finalizer run before the server is stopped 3266 del queue 3267 3268 3269@hashlib_helper.requires_hashdigest('sha256') 3270class _TestManagerRestart(BaseTestCase): 3271 3272 @classmethod 3273 def _putter(cls, address, authkey): 3274 manager = QueueManager( 3275 address=address, authkey=authkey, serializer=SERIALIZER, 3276 shutdown_timeout=SHUTDOWN_TIMEOUT) 3277 manager.connect() 3278 queue = manager.get_queue() 3279 queue.put('hello world') 3280 3281 def test_rapid_restart(self): 3282 authkey = os.urandom(32) 3283 manager = QueueManager( 3284 address=(socket_helper.HOST, 0), authkey=authkey, 3285 serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT) 3286 try: 3287 srvr = manager.get_server() 3288 addr = srvr.address 3289 # Close the connection.Listener socket which gets opened as a part 3290 # of manager.get_server(). It's not needed for the test. 3291 srvr.listener.close() 3292 manager.start() 3293 3294 p = self.Process(target=self._putter, args=(manager.address, authkey)) 3295 p.start() 3296 p.join() 3297 queue = manager.get_queue() 3298 self.assertEqual(queue.get(), 'hello world') 3299 del queue 3300 finally: 3301 if hasattr(manager, "shutdown"): 3302 manager.shutdown() 3303 3304 manager = QueueManager( 3305 address=addr, authkey=authkey, serializer=SERIALIZER, 3306 shutdown_timeout=SHUTDOWN_TIMEOUT) 3307 try: 3308 manager.start() 3309 self.addCleanup(manager.shutdown) 3310 except OSError as e: 3311 if e.errno != errno.EADDRINUSE: 3312 raise 3313 # Retry after some time, in case the old socket was lingering 3314 # (sporadic failure on buildbots) 3315 time.sleep(1.0) 3316 manager = QueueManager( 3317 address=addr, authkey=authkey, serializer=SERIALIZER, 3318 shutdown_timeout=SHUTDOWN_TIMEOUT) 3319 if hasattr(manager, "shutdown"): 3320 self.addCleanup(manager.shutdown) 3321 3322 3323class FakeConnection: 3324 def send(self, payload): 3325 pass 3326 3327 def recv(self): 3328 return '#ERROR', pyqueue.Empty() 3329 3330class TestManagerExceptions(unittest.TestCase): 3331 # Issue 106558: Manager exceptions avoids creating cyclic references. 3332 def setUp(self): 3333 self.mgr = multiprocessing.Manager() 3334 3335 def tearDown(self): 3336 self.mgr.shutdown() 3337 self.mgr.join() 3338 3339 def test_queue_get(self): 3340 queue = self.mgr.Queue() 3341 if gc.isenabled(): 3342 gc.disable() 3343 self.addCleanup(gc.enable) 3344 try: 3345 queue.get_nowait() 3346 except pyqueue.Empty as e: 3347 wr = weakref.ref(e) 3348 self.assertEqual(wr(), None) 3349 3350 def test_dispatch(self): 3351 if gc.isenabled(): 3352 gc.disable() 3353 self.addCleanup(gc.enable) 3354 try: 3355 multiprocessing.managers.dispatch(FakeConnection(), None, None) 3356 except pyqueue.Empty as e: 3357 wr = weakref.ref(e) 3358 self.assertEqual(wr(), None) 3359 3360# 3361# 3362# 3363 3364SENTINEL = latin('') 3365 3366class _TestConnection(BaseTestCase): 3367 3368 ALLOWED_TYPES = ('processes', 'threads') 3369 3370 @classmethod 3371 def _echo(cls, conn): 3372 for msg in iter(conn.recv_bytes, SENTINEL): 3373 conn.send_bytes(msg) 3374 conn.close() 3375 3376 def test_connection(self): 3377 conn, child_conn = self.Pipe() 3378 3379 p = self.Process(target=self._echo, args=(child_conn,)) 3380 p.daemon = True 3381 p.start() 3382 3383 seq = [1, 2.25, None] 3384 msg = latin('hello world') 3385 longmsg = msg * 10 3386 arr = array.array('i', list(range(4))) 3387 3388 if self.TYPE == 'processes': 3389 self.assertEqual(type(conn.fileno()), int) 3390 3391 self.assertEqual(conn.send(seq), None) 3392 self.assertEqual(conn.recv(), seq) 3393 3394 self.assertEqual(conn.send_bytes(msg), None) 3395 self.assertEqual(conn.recv_bytes(), msg) 3396 3397 if self.TYPE == 'processes': 3398 buffer = array.array('i', [0]*10) 3399 expected = list(arr) + [0] * (10 - len(arr)) 3400 self.assertEqual(conn.send_bytes(arr), None) 3401 self.assertEqual(conn.recv_bytes_into(buffer), 3402 len(arr) * buffer.itemsize) 3403 self.assertEqual(list(buffer), expected) 3404 3405 buffer = array.array('i', [0]*10) 3406 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3407 self.assertEqual(conn.send_bytes(arr), None) 3408 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3409 len(arr) * buffer.itemsize) 3410 self.assertEqual(list(buffer), expected) 3411 3412 buffer = bytearray(latin(' ' * 40)) 3413 self.assertEqual(conn.send_bytes(longmsg), None) 3414 try: 3415 res = conn.recv_bytes_into(buffer) 3416 except multiprocessing.BufferTooShort as e: 3417 self.assertEqual(e.args, (longmsg,)) 3418 else: 3419 self.fail('expected BufferTooShort, got %s' % res) 3420 3421 poll = TimingWrapper(conn.poll) 3422 3423 self.assertEqual(poll(), False) 3424 self.assertTimingAlmostEqual(poll.elapsed, 0) 3425 3426 self.assertEqual(poll(-1), False) 3427 self.assertTimingAlmostEqual(poll.elapsed, 0) 3428 3429 self.assertEqual(poll(TIMEOUT1), False) 3430 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3431 3432 conn.send(None) 3433 time.sleep(.1) 3434 3435 self.assertEqual(poll(TIMEOUT1), True) 3436 self.assertTimingAlmostEqual(poll.elapsed, 0) 3437 3438 self.assertEqual(conn.recv(), None) 3439 3440 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3441 conn.send_bytes(really_big_msg) 3442 self.assertEqual(conn.recv_bytes(), really_big_msg) 3443 3444 conn.send_bytes(SENTINEL) # tell child to quit 3445 child_conn.close() 3446 3447 if self.TYPE == 'processes': 3448 self.assertEqual(conn.readable, True) 3449 self.assertEqual(conn.writable, True) 3450 self.assertRaises(EOFError, conn.recv) 3451 self.assertRaises(EOFError, conn.recv_bytes) 3452 3453 p.join() 3454 3455 def test_duplex_false(self): 3456 reader, writer = self.Pipe(duplex=False) 3457 self.assertEqual(writer.send(1), None) 3458 self.assertEqual(reader.recv(), 1) 3459 if self.TYPE == 'processes': 3460 self.assertEqual(reader.readable, True) 3461 self.assertEqual(reader.writable, False) 3462 self.assertEqual(writer.readable, False) 3463 self.assertEqual(writer.writable, True) 3464 self.assertRaises(OSError, reader.send, 2) 3465 self.assertRaises(OSError, writer.recv) 3466 self.assertRaises(OSError, writer.poll) 3467 3468 def test_spawn_close(self): 3469 # We test that a pipe connection can be closed by parent 3470 # process immediately after child is spawned. On Windows this 3471 # would have sometimes failed on old versions because 3472 # child_conn would be closed before the child got a chance to 3473 # duplicate it. 3474 conn, child_conn = self.Pipe() 3475 3476 p = self.Process(target=self._echo, args=(child_conn,)) 3477 p.daemon = True 3478 p.start() 3479 child_conn.close() # this might complete before child initializes 3480 3481 msg = latin('hello') 3482 conn.send_bytes(msg) 3483 self.assertEqual(conn.recv_bytes(), msg) 3484 3485 conn.send_bytes(SENTINEL) 3486 conn.close() 3487 p.join() 3488 3489 def test_sendbytes(self): 3490 if self.TYPE != 'processes': 3491 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3492 3493 msg = latin('abcdefghijklmnopqrstuvwxyz') 3494 a, b = self.Pipe() 3495 3496 a.send_bytes(msg) 3497 self.assertEqual(b.recv_bytes(), msg) 3498 3499 a.send_bytes(msg, 5) 3500 self.assertEqual(b.recv_bytes(), msg[5:]) 3501 3502 a.send_bytes(msg, 7, 8) 3503 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3504 3505 a.send_bytes(msg, 26) 3506 self.assertEqual(b.recv_bytes(), latin('')) 3507 3508 a.send_bytes(msg, 26, 0) 3509 self.assertEqual(b.recv_bytes(), latin('')) 3510 3511 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3512 3513 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3514 3515 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3516 3517 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3518 3519 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3520 3521 @classmethod 3522 def _is_fd_assigned(cls, fd): 3523 try: 3524 os.fstat(fd) 3525 except OSError as e: 3526 if e.errno == errno.EBADF: 3527 return False 3528 raise 3529 else: 3530 return True 3531 3532 @classmethod 3533 def _writefd(cls, conn, data, create_dummy_fds=False): 3534 if create_dummy_fds: 3535 for i in range(0, 256): 3536 if not cls._is_fd_assigned(i): 3537 os.dup2(conn.fileno(), i) 3538 fd = reduction.recv_handle(conn) 3539 if msvcrt: 3540 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3541 os.write(fd, data) 3542 os.close(fd) 3543 3544 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3545 def test_fd_transfer(self): 3546 if self.TYPE != 'processes': 3547 self.skipTest("only makes sense with processes") 3548 conn, child_conn = self.Pipe(duplex=True) 3549 3550 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3551 p.daemon = True 3552 p.start() 3553 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3554 with open(os_helper.TESTFN, "wb") as f: 3555 fd = f.fileno() 3556 if msvcrt: 3557 fd = msvcrt.get_osfhandle(fd) 3558 reduction.send_handle(conn, fd, p.pid) 3559 p.join() 3560 with open(os_helper.TESTFN, "rb") as f: 3561 self.assertEqual(f.read(), b"foo") 3562 3563 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3564 @unittest.skipIf(sys.platform == "win32", 3565 "test semantics don't make sense on Windows") 3566 @unittest.skipIf(MAXFD <= 256, 3567 "largest assignable fd number is too small") 3568 @unittest.skipUnless(hasattr(os, "dup2"), 3569 "test needs os.dup2()") 3570 def test_large_fd_transfer(self): 3571 # With fd > 256 (issue #11657) 3572 if self.TYPE != 'processes': 3573 self.skipTest("only makes sense with processes") 3574 conn, child_conn = self.Pipe(duplex=True) 3575 3576 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3577 p.daemon = True 3578 p.start() 3579 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3580 with open(os_helper.TESTFN, "wb") as f: 3581 fd = f.fileno() 3582 for newfd in range(256, MAXFD): 3583 if not self._is_fd_assigned(newfd): 3584 break 3585 else: 3586 self.fail("could not find an unassigned large file descriptor") 3587 os.dup2(fd, newfd) 3588 try: 3589 reduction.send_handle(conn, newfd, p.pid) 3590 finally: 3591 os.close(newfd) 3592 p.join() 3593 with open(os_helper.TESTFN, "rb") as f: 3594 self.assertEqual(f.read(), b"bar") 3595 3596 @classmethod 3597 def _send_data_without_fd(self, conn): 3598 os.write(conn.fileno(), b"\0") 3599 3600 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3601 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3602 def test_missing_fd_transfer(self): 3603 # Check that exception is raised when received data is not 3604 # accompanied by a file descriptor in ancillary data. 3605 if self.TYPE != 'processes': 3606 self.skipTest("only makes sense with processes") 3607 conn, child_conn = self.Pipe(duplex=True) 3608 3609 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3610 p.daemon = True 3611 p.start() 3612 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3613 p.join() 3614 3615 def test_context(self): 3616 a, b = self.Pipe() 3617 3618 with a, b: 3619 a.send(1729) 3620 self.assertEqual(b.recv(), 1729) 3621 if self.TYPE == 'processes': 3622 self.assertFalse(a.closed) 3623 self.assertFalse(b.closed) 3624 3625 if self.TYPE == 'processes': 3626 self.assertTrue(a.closed) 3627 self.assertTrue(b.closed) 3628 self.assertRaises(OSError, a.recv) 3629 self.assertRaises(OSError, b.recv) 3630 3631class _TestListener(BaseTestCase): 3632 3633 ALLOWED_TYPES = ('processes',) 3634 3635 def test_multiple_bind(self): 3636 for family in self.connection.families: 3637 l = self.connection.Listener(family=family) 3638 self.addCleanup(l.close) 3639 self.assertRaises(OSError, self.connection.Listener, 3640 l.address, family) 3641 3642 def test_context(self): 3643 with self.connection.Listener() as l: 3644 with self.connection.Client(l.address) as c: 3645 with l.accept() as d: 3646 c.send(1729) 3647 self.assertEqual(d.recv(), 1729) 3648 3649 if self.TYPE == 'processes': 3650 self.assertRaises(OSError, l.accept) 3651 3652 def test_empty_authkey(self): 3653 # bpo-43952: allow empty bytes as authkey 3654 def handler(*args): 3655 raise RuntimeError('Connection took too long...') 3656 3657 def run(addr, authkey): 3658 client = self.connection.Client(addr, authkey=authkey) 3659 client.send(1729) 3660 3661 key = b'' 3662 3663 with self.connection.Listener(authkey=key) as listener: 3664 thread = threading.Thread(target=run, args=(listener.address, key)) 3665 thread.start() 3666 try: 3667 with listener.accept() as d: 3668 self.assertEqual(d.recv(), 1729) 3669 finally: 3670 thread.join() 3671 3672 if self.TYPE == 'processes': 3673 with self.assertRaises(OSError): 3674 listener.accept() 3675 3676 @unittest.skipUnless(util.abstract_sockets_supported, 3677 "test needs abstract socket support") 3678 def test_abstract_socket(self): 3679 with self.connection.Listener("\0something") as listener: 3680 with self.connection.Client(listener.address) as client: 3681 with listener.accept() as d: 3682 client.send(1729) 3683 self.assertEqual(d.recv(), 1729) 3684 3685 if self.TYPE == 'processes': 3686 self.assertRaises(OSError, listener.accept) 3687 3688 3689class _TestListenerClient(BaseTestCase): 3690 3691 ALLOWED_TYPES = ('processes', 'threads') 3692 3693 @classmethod 3694 def _test(cls, address): 3695 conn = cls.connection.Client(address) 3696 conn.send('hello') 3697 conn.close() 3698 3699 def test_listener_client(self): 3700 for family in self.connection.families: 3701 l = self.connection.Listener(family=family) 3702 p = self.Process(target=self._test, args=(l.address,)) 3703 p.daemon = True 3704 p.start() 3705 conn = l.accept() 3706 self.assertEqual(conn.recv(), 'hello') 3707 p.join() 3708 l.close() 3709 3710 def test_issue14725(self): 3711 l = self.connection.Listener() 3712 p = self.Process(target=self._test, args=(l.address,)) 3713 p.daemon = True 3714 p.start() 3715 time.sleep(1) 3716 # On Windows the client process should by now have connected, 3717 # written data and closed the pipe handle by now. This causes 3718 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3719 # 14725. 3720 conn = l.accept() 3721 self.assertEqual(conn.recv(), 'hello') 3722 conn.close() 3723 p.join() 3724 l.close() 3725 3726 def test_issue16955(self): 3727 for fam in self.connection.families: 3728 l = self.connection.Listener(family=fam) 3729 c = self.connection.Client(l.address) 3730 a = l.accept() 3731 a.send_bytes(b"hello") 3732 self.assertTrue(c.poll(1)) 3733 a.close() 3734 c.close() 3735 l.close() 3736 3737class _TestPoll(BaseTestCase): 3738 3739 ALLOWED_TYPES = ('processes', 'threads') 3740 3741 def test_empty_string(self): 3742 a, b = self.Pipe() 3743 self.assertEqual(a.poll(), False) 3744 b.send_bytes(b'') 3745 self.assertEqual(a.poll(), True) 3746 self.assertEqual(a.poll(), True) 3747 3748 @classmethod 3749 def _child_strings(cls, conn, strings): 3750 for s in strings: 3751 time.sleep(0.1) 3752 conn.send_bytes(s) 3753 conn.close() 3754 3755 def test_strings(self): 3756 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3757 a, b = self.Pipe() 3758 p = self.Process(target=self._child_strings, args=(b, strings)) 3759 p.start() 3760 3761 for s in strings: 3762 for i in range(200): 3763 if a.poll(0.01): 3764 break 3765 x = a.recv_bytes() 3766 self.assertEqual(s, x) 3767 3768 p.join() 3769 3770 @classmethod 3771 def _child_boundaries(cls, r): 3772 # Polling may "pull" a message in to the child process, but we 3773 # don't want it to pull only part of a message, as that would 3774 # corrupt the pipe for any other processes which might later 3775 # read from it. 3776 r.poll(5) 3777 3778 def test_boundaries(self): 3779 r, w = self.Pipe(False) 3780 p = self.Process(target=self._child_boundaries, args=(r,)) 3781 p.start() 3782 time.sleep(2) 3783 L = [b"first", b"second"] 3784 for obj in L: 3785 w.send_bytes(obj) 3786 w.close() 3787 p.join() 3788 self.assertIn(r.recv_bytes(), L) 3789 3790 @classmethod 3791 def _child_dont_merge(cls, b): 3792 b.send_bytes(b'a') 3793 b.send_bytes(b'b') 3794 b.send_bytes(b'cd') 3795 3796 def test_dont_merge(self): 3797 a, b = self.Pipe() 3798 self.assertEqual(a.poll(0.0), False) 3799 self.assertEqual(a.poll(0.1), False) 3800 3801 p = self.Process(target=self._child_dont_merge, args=(b,)) 3802 p.start() 3803 3804 self.assertEqual(a.recv_bytes(), b'a') 3805 self.assertEqual(a.poll(1.0), True) 3806 self.assertEqual(a.poll(1.0), True) 3807 self.assertEqual(a.recv_bytes(), b'b') 3808 self.assertEqual(a.poll(1.0), True) 3809 self.assertEqual(a.poll(1.0), True) 3810 self.assertEqual(a.poll(0.0), True) 3811 self.assertEqual(a.recv_bytes(), b'cd') 3812 3813 p.join() 3814 3815# 3816# Test of sending connection and socket objects between processes 3817# 3818 3819@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3820@hashlib_helper.requires_hashdigest('sha256') 3821class _TestPicklingConnections(BaseTestCase): 3822 3823 ALLOWED_TYPES = ('processes',) 3824 3825 @classmethod 3826 def tearDownClass(cls): 3827 from multiprocessing import resource_sharer 3828 resource_sharer.stop(timeout=support.LONG_TIMEOUT) 3829 3830 @classmethod 3831 def _listener(cls, conn, families): 3832 for fam in families: 3833 l = cls.connection.Listener(family=fam) 3834 conn.send(l.address) 3835 new_conn = l.accept() 3836 conn.send(new_conn) 3837 new_conn.close() 3838 l.close() 3839 3840 l = socket.create_server((socket_helper.HOST, 0)) 3841 conn.send(l.getsockname()) 3842 new_conn, addr = l.accept() 3843 conn.send(new_conn) 3844 new_conn.close() 3845 l.close() 3846 3847 conn.recv() 3848 3849 @classmethod 3850 def _remote(cls, conn): 3851 for (address, msg) in iter(conn.recv, None): 3852 client = cls.connection.Client(address) 3853 client.send(msg.upper()) 3854 client.close() 3855 3856 address, msg = conn.recv() 3857 client = socket.socket() 3858 client.connect(address) 3859 client.sendall(msg.upper()) 3860 client.close() 3861 3862 conn.close() 3863 3864 def test_pickling(self): 3865 families = self.connection.families 3866 3867 lconn, lconn0 = self.Pipe() 3868 lp = self.Process(target=self._listener, args=(lconn0, families)) 3869 lp.daemon = True 3870 lp.start() 3871 lconn0.close() 3872 3873 rconn, rconn0 = self.Pipe() 3874 rp = self.Process(target=self._remote, args=(rconn0,)) 3875 rp.daemon = True 3876 rp.start() 3877 rconn0.close() 3878 3879 for fam in families: 3880 msg = ('This connection uses family %s' % fam).encode('ascii') 3881 address = lconn.recv() 3882 rconn.send((address, msg)) 3883 new_conn = lconn.recv() 3884 self.assertEqual(new_conn.recv(), msg.upper()) 3885 3886 rconn.send(None) 3887 3888 msg = latin('This connection uses a normal socket') 3889 address = lconn.recv() 3890 rconn.send((address, msg)) 3891 new_conn = lconn.recv() 3892 buf = [] 3893 while True: 3894 s = new_conn.recv(100) 3895 if not s: 3896 break 3897 buf.append(s) 3898 buf = b''.join(buf) 3899 self.assertEqual(buf, msg.upper()) 3900 new_conn.close() 3901 3902 lconn.send(None) 3903 3904 rconn.close() 3905 lconn.close() 3906 3907 lp.join() 3908 rp.join() 3909 3910 @classmethod 3911 def child_access(cls, conn): 3912 w = conn.recv() 3913 w.send('all is well') 3914 w.close() 3915 3916 r = conn.recv() 3917 msg = r.recv() 3918 conn.send(msg*2) 3919 3920 conn.close() 3921 3922 def test_access(self): 3923 # On Windows, if we do not specify a destination pid when 3924 # using DupHandle then we need to be careful to use the 3925 # correct access flags for DuplicateHandle(), or else 3926 # DupHandle.detach() will raise PermissionError. For example, 3927 # for a read only pipe handle we should use 3928 # access=FILE_GENERIC_READ. (Unfortunately 3929 # DUPLICATE_SAME_ACCESS does not work.) 3930 conn, child_conn = self.Pipe() 3931 p = self.Process(target=self.child_access, args=(child_conn,)) 3932 p.daemon = True 3933 p.start() 3934 child_conn.close() 3935 3936 r, w = self.Pipe(duplex=False) 3937 conn.send(w) 3938 w.close() 3939 self.assertEqual(r.recv(), 'all is well') 3940 r.close() 3941 3942 r, w = self.Pipe(duplex=False) 3943 conn.send(r) 3944 r.close() 3945 w.send('foobar') 3946 w.close() 3947 self.assertEqual(conn.recv(), 'foobar'*2) 3948 3949 p.join() 3950 3951# 3952# 3953# 3954 3955class _TestHeap(BaseTestCase): 3956 3957 ALLOWED_TYPES = ('processes',) 3958 3959 def setUp(self): 3960 super().setUp() 3961 # Make pristine heap for these tests 3962 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3963 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3964 3965 def tearDown(self): 3966 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3967 super().tearDown() 3968 3969 def test_heap(self): 3970 iterations = 5000 3971 maxblocks = 50 3972 blocks = [] 3973 3974 # get the heap object 3975 heap = multiprocessing.heap.BufferWrapper._heap 3976 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3977 3978 # create and destroy lots of blocks of different sizes 3979 for i in range(iterations): 3980 size = int(random.lognormvariate(0, 1) * 1000) 3981 b = multiprocessing.heap.BufferWrapper(size) 3982 blocks.append(b) 3983 if len(blocks) > maxblocks: 3984 i = random.randrange(maxblocks) 3985 del blocks[i] 3986 del b 3987 3988 # verify the state of the heap 3989 with heap._lock: 3990 all = [] 3991 free = 0 3992 occupied = 0 3993 for L in list(heap._len_to_seq.values()): 3994 # count all free blocks in arenas 3995 for arena, start, stop in L: 3996 all.append((heap._arenas.index(arena), start, stop, 3997 stop-start, 'free')) 3998 free += (stop-start) 3999 for arena, arena_blocks in heap._allocated_blocks.items(): 4000 # count all allocated blocks in arenas 4001 for start, stop in arena_blocks: 4002 all.append((heap._arenas.index(arena), start, stop, 4003 stop-start, 'occupied')) 4004 occupied += (stop-start) 4005 4006 self.assertEqual(free + occupied, 4007 sum(arena.size for arena in heap._arenas)) 4008 4009 all.sort() 4010 4011 for i in range(len(all)-1): 4012 (arena, start, stop) = all[i][:3] 4013 (narena, nstart, nstop) = all[i+1][:3] 4014 if arena != narena: 4015 # Two different arenas 4016 self.assertEqual(stop, heap._arenas[arena].size) # last block 4017 self.assertEqual(nstart, 0) # first block 4018 else: 4019 # Same arena: two adjacent blocks 4020 self.assertEqual(stop, nstart) 4021 4022 # test free'ing all blocks 4023 random.shuffle(blocks) 4024 while blocks: 4025 blocks.pop() 4026 4027 self.assertEqual(heap._n_frees, heap._n_mallocs) 4028 self.assertEqual(len(heap._pending_free_blocks), 0) 4029 self.assertEqual(len(heap._arenas), 0) 4030 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 4031 self.assertEqual(len(heap._len_to_seq), 0) 4032 4033 def test_free_from_gc(self): 4034 # Check that freeing of blocks by the garbage collector doesn't deadlock 4035 # (issue #12352). 4036 # Make sure the GC is enabled, and set lower collection thresholds to 4037 # make collections more frequent (and increase the probability of 4038 # deadlock). 4039 if not gc.isenabled(): 4040 gc.enable() 4041 self.addCleanup(gc.disable) 4042 thresholds = gc.get_threshold() 4043 self.addCleanup(gc.set_threshold, *thresholds) 4044 gc.set_threshold(10) 4045 4046 # perform numerous block allocations, with cyclic references to make 4047 # sure objects are collected asynchronously by the gc 4048 for i in range(5000): 4049 a = multiprocessing.heap.BufferWrapper(1) 4050 b = multiprocessing.heap.BufferWrapper(1) 4051 # circular references 4052 a.buddy = b 4053 b.buddy = a 4054 4055# 4056# 4057# 4058 4059class _Foo(Structure): 4060 _fields_ = [ 4061 ('x', c_int), 4062 ('y', c_double), 4063 ('z', c_longlong,) 4064 ] 4065 4066class _TestSharedCTypes(BaseTestCase): 4067 4068 ALLOWED_TYPES = ('processes',) 4069 4070 def setUp(self): 4071 if not HAS_SHAREDCTYPES: 4072 self.skipTest("requires multiprocessing.sharedctypes") 4073 4074 @classmethod 4075 def _double(cls, x, y, z, foo, arr, string): 4076 x.value *= 2 4077 y.value *= 2 4078 z.value *= 2 4079 foo.x *= 2 4080 foo.y *= 2 4081 string.value *= 2 4082 for i in range(len(arr)): 4083 arr[i] *= 2 4084 4085 def test_sharedctypes(self, lock=False): 4086 x = Value('i', 7, lock=lock) 4087 y = Value(c_double, 1.0/3.0, lock=lock) 4088 z = Value(c_longlong, 2 ** 33, lock=lock) 4089 foo = Value(_Foo, 3, 2, lock=lock) 4090 arr = self.Array('d', list(range(10)), lock=lock) 4091 string = self.Array('c', 20, lock=lock) 4092 string.value = latin('hello') 4093 4094 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 4095 p.daemon = True 4096 p.start() 4097 p.join() 4098 4099 self.assertEqual(x.value, 14) 4100 self.assertAlmostEqual(y.value, 2.0/3.0) 4101 self.assertEqual(z.value, 2 ** 34) 4102 self.assertEqual(foo.x, 6) 4103 self.assertAlmostEqual(foo.y, 4.0) 4104 for i in range(10): 4105 self.assertAlmostEqual(arr[i], i*2) 4106 self.assertEqual(string.value, latin('hellohello')) 4107 4108 def test_synchronize(self): 4109 self.test_sharedctypes(lock=True) 4110 4111 def test_copy(self): 4112 foo = _Foo(2, 5.0, 2 ** 33) 4113 bar = copy(foo) 4114 foo.x = 0 4115 foo.y = 0 4116 foo.z = 0 4117 self.assertEqual(bar.x, 2) 4118 self.assertAlmostEqual(bar.y, 5.0) 4119 self.assertEqual(bar.z, 2 ** 33) 4120 4121 4122@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 4123@hashlib_helper.requires_hashdigest('sha256') 4124class _TestSharedMemory(BaseTestCase): 4125 4126 ALLOWED_TYPES = ('processes',) 4127 4128 @staticmethod 4129 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 4130 if isinstance(shmem_name_or_obj, str): 4131 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 4132 else: 4133 local_sms = shmem_name_or_obj 4134 local_sms.buf[:len(binary_data)] = binary_data 4135 local_sms.close() 4136 4137 def _new_shm_name(self, prefix): 4138 # Add a PID to the name of a POSIX shared memory object to allow 4139 # running multiprocessing tests (test_multiprocessing_fork, 4140 # test_multiprocessing_spawn, etc) in parallel. 4141 return prefix + str(os.getpid()) 4142 4143 def test_shared_memory_name_with_embedded_null(self): 4144 name_tsmb = self._new_shm_name('test01_null') 4145 sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512) 4146 self.addCleanup(sms.unlink) 4147 with self.assertRaises(ValueError): 4148 shared_memory.SharedMemory(name_tsmb + '\0a', create=False, size=512) 4149 if shared_memory._USE_POSIX: 4150 orig_name = sms._name 4151 try: 4152 sms._name = orig_name + '\0a' 4153 with self.assertRaises(ValueError): 4154 sms.unlink() 4155 finally: 4156 sms._name = orig_name 4157 4158 def test_shared_memory_basics(self): 4159 name_tsmb = self._new_shm_name('test01_tsmb') 4160 sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512) 4161 self.addCleanup(sms.unlink) 4162 4163 # Verify attributes are readable. 4164 self.assertEqual(sms.name, name_tsmb) 4165 self.assertGreaterEqual(sms.size, 512) 4166 self.assertGreaterEqual(len(sms.buf), sms.size) 4167 4168 # Verify __repr__ 4169 self.assertIn(sms.name, str(sms)) 4170 self.assertIn(str(sms.size), str(sms)) 4171 4172 # Modify contents of shared memory segment through memoryview. 4173 sms.buf[0] = 42 4174 self.assertEqual(sms.buf[0], 42) 4175 4176 # Attach to existing shared memory segment. 4177 also_sms = shared_memory.SharedMemory(name_tsmb) 4178 self.assertEqual(also_sms.buf[0], 42) 4179 also_sms.close() 4180 4181 # Attach to existing shared memory segment but specify a new size. 4182 same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size) 4183 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 4184 same_sms.close() 4185 4186 # Creating Shared Memory Segment with -ve size 4187 with self.assertRaises(ValueError): 4188 shared_memory.SharedMemory(create=True, size=-2) 4189 4190 # Attaching Shared Memory Segment without a name 4191 with self.assertRaises(ValueError): 4192 shared_memory.SharedMemory(create=False) 4193 4194 # Test if shared memory segment is created properly, 4195 # when _make_filename returns an existing shared memory segment name 4196 with unittest.mock.patch( 4197 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 4198 4199 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 4200 names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')] 4201 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 4202 # because some POSIX compliant systems require name to start with / 4203 names = [NAME_PREFIX + name for name in names] 4204 4205 mock_make_filename.side_effect = names 4206 shm1 = shared_memory.SharedMemory(create=True, size=1) 4207 self.addCleanup(shm1.unlink) 4208 self.assertEqual(shm1._name, names[0]) 4209 4210 mock_make_filename.side_effect = names 4211 shm2 = shared_memory.SharedMemory(create=True, size=1) 4212 self.addCleanup(shm2.unlink) 4213 self.assertEqual(shm2._name, names[1]) 4214 4215 if shared_memory._USE_POSIX: 4216 # Posix Shared Memory can only be unlinked once. Here we 4217 # test an implementation detail that is not observed across 4218 # all supported platforms (since WindowsNamedSharedMemory 4219 # manages unlinking on its own and unlink() does nothing). 4220 # True release of shared memory segment does not necessarily 4221 # happen until process exits, depending on the OS platform. 4222 name_dblunlink = self._new_shm_name('test01_dblunlink') 4223 sms_uno = shared_memory.SharedMemory( 4224 name_dblunlink, 4225 create=True, 4226 size=5000 4227 ) 4228 with self.assertRaises(FileNotFoundError): 4229 try: 4230 self.assertGreaterEqual(sms_uno.size, 5000) 4231 4232 sms_duo = shared_memory.SharedMemory(name_dblunlink) 4233 sms_duo.unlink() # First shm_unlink() call. 4234 sms_duo.close() 4235 sms_uno.close() 4236 4237 finally: 4238 sms_uno.unlink() # A second shm_unlink() call is bad. 4239 4240 with self.assertRaises(FileExistsError): 4241 # Attempting to create a new shared memory segment with a 4242 # name that is already in use triggers an exception. 4243 there_can_only_be_one_sms = shared_memory.SharedMemory( 4244 name_tsmb, 4245 create=True, 4246 size=512 4247 ) 4248 4249 if shared_memory._USE_POSIX: 4250 # Requesting creation of a shared memory segment with the option 4251 # to attach to an existing segment, if that name is currently in 4252 # use, should not trigger an exception. 4253 # Note: Using a smaller size could possibly cause truncation of 4254 # the existing segment but is OS platform dependent. In the 4255 # case of MacOS/darwin, requesting a smaller size is disallowed. 4256 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 4257 _flags = os.O_CREAT | os.O_RDWR 4258 ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb) 4259 self.assertEqual(ok_if_exists_sms.size, sms.size) 4260 ok_if_exists_sms.close() 4261 4262 # Attempting to attach to an existing shared memory segment when 4263 # no segment exists with the supplied name triggers an exception. 4264 with self.assertRaises(FileNotFoundError): 4265 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 4266 nonexisting_sms.unlink() # Error should occur on prior line. 4267 4268 sms.close() 4269 4270 def test_shared_memory_recreate(self): 4271 # Test if shared memory segment is created properly, 4272 # when _make_filename returns an existing shared memory segment name 4273 with unittest.mock.patch( 4274 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 4275 4276 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 4277 names = [self._new_shm_name('test03_fn'), self._new_shm_name('test04_fn')] 4278 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 4279 # because some POSIX compliant systems require name to start with / 4280 names = [NAME_PREFIX + name for name in names] 4281 4282 mock_make_filename.side_effect = names 4283 shm1 = shared_memory.SharedMemory(create=True, size=1) 4284 self.addCleanup(shm1.unlink) 4285 self.assertEqual(shm1._name, names[0]) 4286 4287 mock_make_filename.side_effect = names 4288 shm2 = shared_memory.SharedMemory(create=True, size=1) 4289 self.addCleanup(shm2.unlink) 4290 self.assertEqual(shm2._name, names[1]) 4291 4292 def test_invalid_shared_memory_creation(self): 4293 # Test creating a shared memory segment with negative size 4294 with self.assertRaises(ValueError): 4295 sms_invalid = shared_memory.SharedMemory(create=True, size=-1) 4296 4297 # Test creating a shared memory segment with size 0 4298 with self.assertRaises(ValueError): 4299 sms_invalid = shared_memory.SharedMemory(create=True, size=0) 4300 4301 # Test creating a shared memory segment without size argument 4302 with self.assertRaises(ValueError): 4303 sms_invalid = shared_memory.SharedMemory(create=True) 4304 4305 def test_shared_memory_pickle_unpickle(self): 4306 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4307 with self.subTest(proto=proto): 4308 sms = shared_memory.SharedMemory(create=True, size=512) 4309 self.addCleanup(sms.unlink) 4310 sms.buf[0:6] = b'pickle' 4311 4312 # Test pickling 4313 pickled_sms = pickle.dumps(sms, protocol=proto) 4314 4315 # Test unpickling 4316 sms2 = pickle.loads(pickled_sms) 4317 self.assertIsInstance(sms2, shared_memory.SharedMemory) 4318 self.assertEqual(sms.name, sms2.name) 4319 self.assertEqual(bytes(sms.buf[0:6]), b'pickle') 4320 self.assertEqual(bytes(sms2.buf[0:6]), b'pickle') 4321 4322 # Test that unpickled version is still the same SharedMemory 4323 sms.buf[0:6] = b'newval' 4324 self.assertEqual(bytes(sms.buf[0:6]), b'newval') 4325 self.assertEqual(bytes(sms2.buf[0:6]), b'newval') 4326 4327 sms2.buf[0:6] = b'oldval' 4328 self.assertEqual(bytes(sms.buf[0:6]), b'oldval') 4329 self.assertEqual(bytes(sms2.buf[0:6]), b'oldval') 4330 4331 def test_shared_memory_pickle_unpickle_dead_object(self): 4332 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4333 with self.subTest(proto=proto): 4334 sms = shared_memory.SharedMemory(create=True, size=512) 4335 sms.buf[0:6] = b'pickle' 4336 pickled_sms = pickle.dumps(sms, protocol=proto) 4337 4338 # Now, we are going to kill the original object. 4339 # So, unpickled one won't be able to attach to it. 4340 sms.close() 4341 sms.unlink() 4342 4343 with self.assertRaises(FileNotFoundError): 4344 pickle.loads(pickled_sms) 4345 4346 def test_shared_memory_across_processes(self): 4347 # bpo-40135: don't define shared memory block's name in case of 4348 # the failure when we run multiprocessing tests in parallel. 4349 sms = shared_memory.SharedMemory(create=True, size=512) 4350 self.addCleanup(sms.unlink) 4351 4352 # Verify remote attachment to existing block by name is working. 4353 p = self.Process( 4354 target=self._attach_existing_shmem_then_write, 4355 args=(sms.name, b'howdy') 4356 ) 4357 p.daemon = True 4358 p.start() 4359 p.join() 4360 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 4361 4362 # Verify pickling of SharedMemory instance also works. 4363 p = self.Process( 4364 target=self._attach_existing_shmem_then_write, 4365 args=(sms, b'HELLO') 4366 ) 4367 p.daemon = True 4368 p.start() 4369 p.join() 4370 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 4371 4372 sms.close() 4373 4374 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 4375 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 4376 # bpo-36368: protect SharedMemoryManager server process from 4377 # KeyboardInterrupt signals. 4378 smm = multiprocessing.managers.SharedMemoryManager() 4379 smm.start() 4380 4381 # make sure the manager works properly at the beginning 4382 sl = smm.ShareableList(range(10)) 4383 4384 # the manager's server should ignore KeyboardInterrupt signals, and 4385 # maintain its connection with the current process, and success when 4386 # asked to deliver memory segments. 4387 os.kill(smm._process.pid, signal.SIGINT) 4388 4389 sl2 = smm.ShareableList(range(10)) 4390 4391 # test that the custom signal handler registered in the Manager does 4392 # not affect signal handling in the parent process. 4393 with self.assertRaises(KeyboardInterrupt): 4394 os.kill(os.getpid(), signal.SIGINT) 4395 4396 smm.shutdown() 4397 4398 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 4399 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 4400 # bpo-36867: test that a SharedMemoryManager uses the 4401 # same resource_tracker process as its parent. 4402 cmd = '''if 1: 4403 from multiprocessing.managers import SharedMemoryManager 4404 4405 4406 smm = SharedMemoryManager() 4407 smm.start() 4408 sl = smm.ShareableList(range(10)) 4409 smm.shutdown() 4410 ''' 4411 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 4412 4413 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 4414 # resource_tracker process as its parent would make the parent's 4415 # tracker complain about sl being leaked even though smm.shutdown() 4416 # properly released sl. 4417 self.assertFalse(err) 4418 4419 def test_shared_memory_SharedMemoryManager_basics(self): 4420 smm1 = multiprocessing.managers.SharedMemoryManager() 4421 with self.assertRaises(ValueError): 4422 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 4423 smm1.start() 4424 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 4425 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 4426 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 4427 self.assertEqual(len(doppleganger_list0), 5) 4428 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 4429 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 4430 held_name = lom[0].name 4431 smm1.shutdown() 4432 if sys.platform != "win32": 4433 # Calls to unlink() have no effect on Windows platform; shared 4434 # memory will only be released once final process exits. 4435 with self.assertRaises(FileNotFoundError): 4436 # No longer there to be attached to again. 4437 absent_shm = shared_memory.SharedMemory(name=held_name) 4438 4439 with multiprocessing.managers.SharedMemoryManager() as smm2: 4440 sl = smm2.ShareableList("howdy") 4441 shm = smm2.SharedMemory(size=128) 4442 held_name = sl.shm.name 4443 if sys.platform != "win32": 4444 with self.assertRaises(FileNotFoundError): 4445 # No longer there to be attached to again. 4446 absent_sl = shared_memory.ShareableList(name=held_name) 4447 4448 4449 def test_shared_memory_ShareableList_basics(self): 4450 sl = shared_memory.ShareableList( 4451 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 4452 ) 4453 self.addCleanup(sl.shm.unlink) 4454 4455 # Verify __repr__ 4456 self.assertIn(sl.shm.name, str(sl)) 4457 self.assertIn(str(list(sl)), str(sl)) 4458 4459 # Index Out of Range (get) 4460 with self.assertRaises(IndexError): 4461 sl[7] 4462 4463 # Index Out of Range (set) 4464 with self.assertRaises(IndexError): 4465 sl[7] = 2 4466 4467 # Assign value without format change (str -> str) 4468 current_format = sl._get_packing_format(0) 4469 sl[0] = 'howdy' 4470 self.assertEqual(current_format, sl._get_packing_format(0)) 4471 4472 # Verify attributes are readable. 4473 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 4474 4475 # Exercise len(). 4476 self.assertEqual(len(sl), 7) 4477 4478 # Exercise index(). 4479 with warnings.catch_warnings(): 4480 # Suppress BytesWarning when comparing against b'HoWdY'. 4481 warnings.simplefilter('ignore') 4482 with self.assertRaises(ValueError): 4483 sl.index('100') 4484 self.assertEqual(sl.index(100), 3) 4485 4486 # Exercise retrieving individual values. 4487 self.assertEqual(sl[0], 'howdy') 4488 self.assertEqual(sl[-2], True) 4489 4490 # Exercise iterability. 4491 self.assertEqual( 4492 tuple(sl), 4493 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 4494 ) 4495 4496 # Exercise modifying individual values. 4497 sl[3] = 42 4498 self.assertEqual(sl[3], 42) 4499 sl[4] = 'some' # Change type at a given position. 4500 self.assertEqual(sl[4], 'some') 4501 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 4502 with self.assertRaisesRegex(ValueError, 4503 "exceeds available storage"): 4504 sl[4] = 'far too many' 4505 self.assertEqual(sl[4], 'some') 4506 sl[0] = 'encodés' # Exactly 8 bytes of UTF-8 data 4507 self.assertEqual(sl[0], 'encodés') 4508 self.assertEqual(sl[1], b'HoWdY') # no spillage 4509 with self.assertRaisesRegex(ValueError, 4510 "exceeds available storage"): 4511 sl[0] = 'encodées' # Exactly 9 bytes of UTF-8 data 4512 self.assertEqual(sl[1], b'HoWdY') 4513 with self.assertRaisesRegex(ValueError, 4514 "exceeds available storage"): 4515 sl[1] = b'123456789' 4516 self.assertEqual(sl[1], b'HoWdY') 4517 4518 # Exercise count(). 4519 with warnings.catch_warnings(): 4520 # Suppress BytesWarning when comparing against b'HoWdY'. 4521 warnings.simplefilter('ignore') 4522 self.assertEqual(sl.count(42), 2) 4523 self.assertEqual(sl.count(b'HoWdY'), 1) 4524 self.assertEqual(sl.count(b'adios'), 0) 4525 4526 # Exercise creating a duplicate. 4527 name_duplicate = self._new_shm_name('test03_duplicate') 4528 sl_copy = shared_memory.ShareableList(sl, name=name_duplicate) 4529 try: 4530 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 4531 self.assertEqual(name_duplicate, sl_copy.shm.name) 4532 self.assertEqual(list(sl), list(sl_copy)) 4533 self.assertEqual(sl.format, sl_copy.format) 4534 sl_copy[-1] = 77 4535 self.assertEqual(sl_copy[-1], 77) 4536 self.assertNotEqual(sl[-1], 77) 4537 sl_copy.shm.close() 4538 finally: 4539 sl_copy.shm.unlink() 4540 4541 # Obtain a second handle on the same ShareableList. 4542 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 4543 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 4544 sl_tethered[-1] = 880 4545 self.assertEqual(sl[-1], 880) 4546 sl_tethered.shm.close() 4547 4548 sl.shm.close() 4549 4550 # Exercise creating an empty ShareableList. 4551 empty_sl = shared_memory.ShareableList() 4552 try: 4553 self.assertEqual(len(empty_sl), 0) 4554 self.assertEqual(empty_sl.format, '') 4555 self.assertEqual(empty_sl.count('any'), 0) 4556 with self.assertRaises(ValueError): 4557 empty_sl.index(None) 4558 empty_sl.shm.close() 4559 finally: 4560 empty_sl.shm.unlink() 4561 4562 def test_shared_memory_ShareableList_pickling(self): 4563 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4564 with self.subTest(proto=proto): 4565 sl = shared_memory.ShareableList(range(10)) 4566 self.addCleanup(sl.shm.unlink) 4567 4568 serialized_sl = pickle.dumps(sl, protocol=proto) 4569 deserialized_sl = pickle.loads(serialized_sl) 4570 self.assertIsInstance( 4571 deserialized_sl, shared_memory.ShareableList) 4572 self.assertEqual(deserialized_sl[-1], 9) 4573 self.assertIsNot(sl, deserialized_sl) 4574 4575 deserialized_sl[4] = "changed" 4576 self.assertEqual(sl[4], "changed") 4577 sl[3] = "newvalue" 4578 self.assertEqual(deserialized_sl[3], "newvalue") 4579 4580 larger_sl = shared_memory.ShareableList(range(400)) 4581 self.addCleanup(larger_sl.shm.unlink) 4582 serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto) 4583 self.assertEqual(len(serialized_sl), len(serialized_larger_sl)) 4584 larger_sl.shm.close() 4585 4586 deserialized_sl.shm.close() 4587 sl.shm.close() 4588 4589 def test_shared_memory_ShareableList_pickling_dead_object(self): 4590 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4591 with self.subTest(proto=proto): 4592 sl = shared_memory.ShareableList(range(10)) 4593 serialized_sl = pickle.dumps(sl, protocol=proto) 4594 4595 # Now, we are going to kill the original object. 4596 # So, unpickled one won't be able to attach to it. 4597 sl.shm.close() 4598 sl.shm.unlink() 4599 4600 with self.assertRaises(FileNotFoundError): 4601 pickle.loads(serialized_sl) 4602 4603 def test_shared_memory_cleaned_after_process_termination(self): 4604 cmd = '''if 1: 4605 import os, time, sys 4606 from multiprocessing import shared_memory 4607 4608 # Create a shared_memory segment, and send the segment name 4609 sm = shared_memory.SharedMemory(create=True, size=10) 4610 sys.stdout.write(sm.name + '\\n') 4611 sys.stdout.flush() 4612 time.sleep(100) 4613 ''' 4614 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4615 stdout=subprocess.PIPE, 4616 stderr=subprocess.PIPE) as p: 4617 name = p.stdout.readline().strip().decode() 4618 4619 # killing abruptly processes holding reference to a shared memory 4620 # segment should not leak the given memory segment. 4621 p.terminate() 4622 p.wait() 4623 4624 err_msg = ("A SharedMemory segment was leaked after " 4625 "a process was abruptly terminated") 4626 for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg): 4627 try: 4628 smm = shared_memory.SharedMemory(name, create=False) 4629 except FileNotFoundError: 4630 break 4631 4632 if os.name == 'posix': 4633 # Without this line it was raising warnings like: 4634 # UserWarning: resource_tracker: 4635 # There appear to be 1 leaked shared_memory 4636 # objects to clean up at shutdown 4637 # See: https://bugs.python.org/issue45209 4638 resource_tracker.unregister(f"/{name}", "shared_memory") 4639 4640 # A warning was emitted by the subprocess' own 4641 # resource_tracker (on Windows, shared memory segments 4642 # are released automatically by the OS). 4643 err = p.stderr.read().decode() 4644 self.assertIn( 4645 "resource_tracker: There appear to be 1 leaked " 4646 "shared_memory objects to clean up at shutdown", err) 4647 4648 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 4649 def test_shared_memory_untracking(self): 4650 # gh-82300: When a separate Python process accesses shared memory 4651 # with track=False, it must not cause the memory to be deleted 4652 # when terminating. 4653 cmd = '''if 1: 4654 import sys 4655 from multiprocessing.shared_memory import SharedMemory 4656 mem = SharedMemory(create=False, name=sys.argv[1], track=False) 4657 mem.close() 4658 ''' 4659 mem = shared_memory.SharedMemory(create=True, size=10) 4660 # The resource tracker shares pipes with the subprocess, and so 4661 # err existing means that the tracker process has terminated now. 4662 try: 4663 rc, out, err = script_helper.assert_python_ok("-c", cmd, mem.name) 4664 self.assertNotIn(b"resource_tracker", err) 4665 self.assertEqual(rc, 0) 4666 mem2 = shared_memory.SharedMemory(create=False, name=mem.name) 4667 mem2.close() 4668 finally: 4669 try: 4670 mem.unlink() 4671 except OSError: 4672 pass 4673 mem.close() 4674 4675 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 4676 def test_shared_memory_tracking(self): 4677 # gh-82300: When a separate Python process accesses shared memory 4678 # with track=True, it must cause the memory to be deleted when 4679 # terminating. 4680 cmd = '''if 1: 4681 import sys 4682 from multiprocessing.shared_memory import SharedMemory 4683 mem = SharedMemory(create=False, name=sys.argv[1], track=True) 4684 mem.close() 4685 ''' 4686 mem = shared_memory.SharedMemory(create=True, size=10) 4687 try: 4688 rc, out, err = script_helper.assert_python_ok("-c", cmd, mem.name) 4689 self.assertEqual(rc, 0) 4690 self.assertIn( 4691 b"resource_tracker: There appear to be 1 leaked " 4692 b"shared_memory objects to clean up at shutdown", err) 4693 finally: 4694 try: 4695 mem.unlink() 4696 except OSError: 4697 pass 4698 resource_tracker.unregister(mem._name, "shared_memory") 4699 mem.close() 4700 4701# 4702# Test to verify that `Finalize` works. 4703# 4704 4705class _TestFinalize(BaseTestCase): 4706 4707 ALLOWED_TYPES = ('processes',) 4708 4709 def setUp(self): 4710 self.registry_backup = util._finalizer_registry.copy() 4711 util._finalizer_registry.clear() 4712 4713 def tearDown(self): 4714 gc.collect() # For PyPy or other GCs. 4715 self.assertFalse(util._finalizer_registry) 4716 util._finalizer_registry.update(self.registry_backup) 4717 4718 @classmethod 4719 def _test_finalize(cls, conn): 4720 class Foo(object): 4721 pass 4722 4723 a = Foo() 4724 util.Finalize(a, conn.send, args=('a',)) 4725 del a # triggers callback for a 4726 gc.collect() # For PyPy or other GCs. 4727 4728 b = Foo() 4729 close_b = util.Finalize(b, conn.send, args=('b',)) 4730 close_b() # triggers callback for b 4731 close_b() # does nothing because callback has already been called 4732 del b # does nothing because callback has already been called 4733 gc.collect() # For PyPy or other GCs. 4734 4735 c = Foo() 4736 util.Finalize(c, conn.send, args=('c',)) 4737 4738 d10 = Foo() 4739 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4740 4741 d01 = Foo() 4742 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4743 d02 = Foo() 4744 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4745 d03 = Foo() 4746 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4747 4748 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4749 4750 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4751 4752 # call multiprocessing's cleanup function then exit process without 4753 # garbage collecting locals 4754 util._exit_function() 4755 conn.close() 4756 os._exit(0) 4757 4758 def test_finalize(self): 4759 conn, child_conn = self.Pipe() 4760 4761 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4762 p.daemon = True 4763 p.start() 4764 p.join() 4765 4766 result = [obj for obj in iter(conn.recv, 'STOP')] 4767 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4768 4769 @support.requires_resource('cpu') 4770 def test_thread_safety(self): 4771 # bpo-24484: _run_finalizers() should be thread-safe 4772 def cb(): 4773 pass 4774 4775 class Foo(object): 4776 def __init__(self): 4777 self.ref = self # create reference cycle 4778 # insert finalizer at random key 4779 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4780 4781 finish = False 4782 exc = None 4783 4784 def run_finalizers(): 4785 nonlocal exc 4786 while not finish: 4787 time.sleep(random.random() * 1e-1) 4788 try: 4789 # A GC run will eventually happen during this, 4790 # collecting stale Foo's and mutating the registry 4791 util._run_finalizers() 4792 except Exception as e: 4793 exc = e 4794 4795 def make_finalizers(): 4796 nonlocal exc 4797 d = {} 4798 while not finish: 4799 try: 4800 # Old Foo's get gradually replaced and later 4801 # collected by the GC (because of the cyclic ref) 4802 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4803 except Exception as e: 4804 exc = e 4805 d.clear() 4806 4807 old_interval = sys.getswitchinterval() 4808 old_threshold = gc.get_threshold() 4809 try: 4810 support.setswitchinterval(1e-6) 4811 gc.set_threshold(5, 5, 5) 4812 threads = [threading.Thread(target=run_finalizers), 4813 threading.Thread(target=make_finalizers)] 4814 with threading_helper.start_threads(threads): 4815 time.sleep(4.0) # Wait a bit to trigger race condition 4816 finish = True 4817 if exc is not None: 4818 raise exc 4819 finally: 4820 sys.setswitchinterval(old_interval) 4821 gc.set_threshold(*old_threshold) 4822 gc.collect() # Collect remaining Foo's 4823 4824 4825# 4826# Test that from ... import * works for each module 4827# 4828 4829class _TestImportStar(unittest.TestCase): 4830 4831 def get_module_names(self): 4832 import glob 4833 folder = os.path.dirname(multiprocessing.__file__) 4834 pattern = os.path.join(glob.escape(folder), '*.py') 4835 files = glob.glob(pattern) 4836 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4837 modules = ['multiprocessing.' + m for m in modules] 4838 modules.remove('multiprocessing.__init__') 4839 modules.append('multiprocessing') 4840 return modules 4841 4842 def test_import(self): 4843 modules = self.get_module_names() 4844 if sys.platform == 'win32': 4845 modules.remove('multiprocessing.popen_fork') 4846 modules.remove('multiprocessing.popen_forkserver') 4847 modules.remove('multiprocessing.popen_spawn_posix') 4848 else: 4849 modules.remove('multiprocessing.popen_spawn_win32') 4850 if not HAS_REDUCTION: 4851 modules.remove('multiprocessing.popen_forkserver') 4852 4853 if c_int is None: 4854 # This module requires _ctypes 4855 modules.remove('multiprocessing.sharedctypes') 4856 4857 for name in modules: 4858 __import__(name) 4859 mod = sys.modules[name] 4860 self.assertTrue(hasattr(mod, '__all__'), name) 4861 4862 for attr in mod.__all__: 4863 self.assertTrue( 4864 hasattr(mod, attr), 4865 '%r does not have attribute %r' % (mod, attr) 4866 ) 4867 4868# 4869# Quick test that logging works -- does not test logging output 4870# 4871 4872class _TestLogging(BaseTestCase): 4873 4874 ALLOWED_TYPES = ('processes',) 4875 4876 def test_enable_logging(self): 4877 logger = multiprocessing.get_logger() 4878 logger.setLevel(util.SUBWARNING) 4879 self.assertTrue(logger is not None) 4880 logger.debug('this will not be printed') 4881 logger.info('nor will this') 4882 logger.setLevel(LOG_LEVEL) 4883 4884 @classmethod 4885 def _test_level(cls, conn): 4886 logger = multiprocessing.get_logger() 4887 conn.send(logger.getEffectiveLevel()) 4888 4889 def test_level(self): 4890 LEVEL1 = 32 4891 LEVEL2 = 37 4892 4893 logger = multiprocessing.get_logger() 4894 root_logger = logging.getLogger() 4895 root_level = root_logger.level 4896 4897 reader, writer = multiprocessing.Pipe(duplex=False) 4898 4899 logger.setLevel(LEVEL1) 4900 p = self.Process(target=self._test_level, args=(writer,)) 4901 p.start() 4902 self.assertEqual(LEVEL1, reader.recv()) 4903 p.join() 4904 p.close() 4905 4906 logger.setLevel(logging.NOTSET) 4907 root_logger.setLevel(LEVEL2) 4908 p = self.Process(target=self._test_level, args=(writer,)) 4909 p.start() 4910 self.assertEqual(LEVEL2, reader.recv()) 4911 p.join() 4912 p.close() 4913 4914 root_logger.setLevel(root_level) 4915 logger.setLevel(level=LOG_LEVEL) 4916 4917 def test_filename(self): 4918 logger = multiprocessing.get_logger() 4919 original_level = logger.level 4920 try: 4921 logger.setLevel(util.DEBUG) 4922 stream = io.StringIO() 4923 handler = logging.StreamHandler(stream) 4924 logging_format = '[%(levelname)s] [%(filename)s] %(message)s' 4925 handler.setFormatter(logging.Formatter(logging_format)) 4926 logger.addHandler(handler) 4927 logger.info('1') 4928 util.info('2') 4929 logger.debug('3') 4930 filename = os.path.basename(__file__) 4931 log_record = stream.getvalue() 4932 self.assertIn(f'[INFO] [{filename}] 1', log_record) 4933 self.assertIn(f'[INFO] [{filename}] 2', log_record) 4934 self.assertIn(f'[DEBUG] [{filename}] 3', log_record) 4935 finally: 4936 logger.setLevel(original_level) 4937 logger.removeHandler(handler) 4938 handler.close() 4939 4940 4941# class _TestLoggingProcessName(BaseTestCase): 4942# 4943# def handle(self, record): 4944# assert record.processName == multiprocessing.current_process().name 4945# self.__handled = True 4946# 4947# def test_logging(self): 4948# handler = logging.Handler() 4949# handler.handle = self.handle 4950# self.__handled = False 4951# # Bypass getLogger() and side-effects 4952# logger = logging.getLoggerClass()( 4953# 'multiprocessing.test.TestLoggingProcessName') 4954# logger.addHandler(handler) 4955# logger.propagate = False 4956# 4957# logger.warn('foo') 4958# assert self.__handled 4959 4960# 4961# Check that Process.join() retries if os.waitpid() fails with EINTR 4962# 4963 4964class _TestPollEintr(BaseTestCase): 4965 4966 ALLOWED_TYPES = ('processes',) 4967 4968 @classmethod 4969 def _killer(cls, pid): 4970 time.sleep(0.1) 4971 os.kill(pid, signal.SIGUSR1) 4972 4973 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4974 def test_poll_eintr(self): 4975 got_signal = [False] 4976 def record(*args): 4977 got_signal[0] = True 4978 pid = os.getpid() 4979 oldhandler = signal.signal(signal.SIGUSR1, record) 4980 try: 4981 killer = self.Process(target=self._killer, args=(pid,)) 4982 killer.start() 4983 try: 4984 p = self.Process(target=time.sleep, args=(2,)) 4985 p.start() 4986 p.join() 4987 finally: 4988 killer.join() 4989 self.assertTrue(got_signal[0]) 4990 self.assertEqual(p.exitcode, 0) 4991 finally: 4992 signal.signal(signal.SIGUSR1, oldhandler) 4993 4994# 4995# Test to verify handle verification, see issue 3321 4996# 4997 4998class TestInvalidHandle(unittest.TestCase): 4999 5000 @unittest.skipIf(WIN32, "skipped on Windows") 5001 def test_invalid_handles(self): 5002 conn = multiprocessing.connection.Connection(44977608) 5003 # check that poll() doesn't crash 5004 try: 5005 conn.poll() 5006 except (ValueError, OSError): 5007 pass 5008 finally: 5009 # Hack private attribute _handle to avoid printing an error 5010 # in conn.__del__ 5011 conn._handle = None 5012 self.assertRaises((ValueError, OSError), 5013 multiprocessing.connection.Connection, -1) 5014 5015 5016 5017@hashlib_helper.requires_hashdigest('sha256') 5018class OtherTest(unittest.TestCase): 5019 # TODO: add more tests for deliver/answer challenge. 5020 def test_deliver_challenge_auth_failure(self): 5021 class _FakeConnection(object): 5022 def recv_bytes(self, size): 5023 return b'something bogus' 5024 def send_bytes(self, data): 5025 pass 5026 self.assertRaises(multiprocessing.AuthenticationError, 5027 multiprocessing.connection.deliver_challenge, 5028 _FakeConnection(), b'abc') 5029 5030 def test_answer_challenge_auth_failure(self): 5031 class _FakeConnection(object): 5032 def __init__(self): 5033 self.count = 0 5034 def recv_bytes(self, size): 5035 self.count += 1 5036 if self.count == 1: 5037 return multiprocessing.connection._CHALLENGE 5038 elif self.count == 2: 5039 return b'something bogus' 5040 return b'' 5041 def send_bytes(self, data): 5042 pass 5043 self.assertRaises(multiprocessing.AuthenticationError, 5044 multiprocessing.connection.answer_challenge, 5045 _FakeConnection(), b'abc') 5046 5047 5048@hashlib_helper.requires_hashdigest('md5') 5049@hashlib_helper.requires_hashdigest('sha256') 5050class ChallengeResponseTest(unittest.TestCase): 5051 authkey = b'supadupasecretkey' 5052 5053 def create_response(self, message): 5054 return multiprocessing.connection._create_response( 5055 self.authkey, message 5056 ) 5057 5058 def verify_challenge(self, message, response): 5059 return multiprocessing.connection._verify_challenge( 5060 self.authkey, message, response 5061 ) 5062 5063 def test_challengeresponse(self): 5064 for algo in [None, "md5", "sha256"]: 5065 with self.subTest(f"{algo=}"): 5066 msg = b'is-twenty-bytes-long' # The length of a legacy message. 5067 if algo: 5068 prefix = b'{%s}' % algo.encode("ascii") 5069 else: 5070 prefix = b'' 5071 msg = prefix + msg 5072 response = self.create_response(msg) 5073 if not response.startswith(prefix): 5074 self.fail(response) 5075 self.verify_challenge(msg, response) 5076 5077 # TODO(gpshead): We need integration tests for handshakes between modern 5078 # deliver_challenge() and verify_response() code and connections running a 5079 # test-local copy of the legacy Python <=3.11 implementations. 5080 5081 # TODO(gpshead): properly annotate tests for requires_hashdigest rather than 5082 # only running these on a platform supporting everything. otherwise logic 5083 # issues preventing it from working on FIPS mode setups will be hidden. 5084 5085# 5086# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 5087# 5088 5089def initializer(ns): 5090 ns.test += 1 5091 5092@hashlib_helper.requires_hashdigest('sha256') 5093class TestInitializers(unittest.TestCase): 5094 def setUp(self): 5095 self.mgr = multiprocessing.Manager() 5096 self.ns = self.mgr.Namespace() 5097 self.ns.test = 0 5098 5099 def tearDown(self): 5100 self.mgr.shutdown() 5101 self.mgr.join() 5102 5103 def test_manager_initializer(self): 5104 m = multiprocessing.managers.SyncManager() 5105 self.assertRaises(TypeError, m.start, 1) 5106 m.start(initializer, (self.ns,)) 5107 self.assertEqual(self.ns.test, 1) 5108 m.shutdown() 5109 m.join() 5110 5111 def test_pool_initializer(self): 5112 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 5113 p = multiprocessing.Pool(1, initializer, (self.ns,)) 5114 p.close() 5115 p.join() 5116 self.assertEqual(self.ns.test, 1) 5117 5118# 5119# Issue 5155, 5313, 5331: Test process in processes 5120# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 5121# 5122 5123def _this_sub_process(q): 5124 try: 5125 item = q.get(block=False) 5126 except pyqueue.Empty: 5127 pass 5128 5129def _test_process(): 5130 queue = multiprocessing.Queue() 5131 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 5132 subProc.daemon = True 5133 subProc.start() 5134 subProc.join() 5135 5136def _afunc(x): 5137 return x*x 5138 5139def pool_in_process(): 5140 pool = multiprocessing.Pool(processes=4) 5141 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 5142 pool.close() 5143 pool.join() 5144 5145class _file_like(object): 5146 def __init__(self, delegate): 5147 self._delegate = delegate 5148 self._pid = None 5149 5150 @property 5151 def cache(self): 5152 pid = os.getpid() 5153 # There are no race conditions since fork keeps only the running thread 5154 if pid != self._pid: 5155 self._pid = pid 5156 self._cache = [] 5157 return self._cache 5158 5159 def write(self, data): 5160 self.cache.append(data) 5161 5162 def flush(self): 5163 self._delegate.write(''.join(self.cache)) 5164 self._cache = [] 5165 5166class TestStdinBadfiledescriptor(unittest.TestCase): 5167 5168 def test_queue_in_process(self): 5169 proc = multiprocessing.Process(target=_test_process) 5170 proc.start() 5171 proc.join() 5172 5173 def test_pool_in_process(self): 5174 p = multiprocessing.Process(target=pool_in_process) 5175 p.start() 5176 p.join() 5177 5178 def test_flushing(self): 5179 sio = io.StringIO() 5180 flike = _file_like(sio) 5181 flike.write('foo') 5182 proc = multiprocessing.Process(target=lambda: flike.flush()) 5183 flike.flush() 5184 assert sio.getvalue() == 'foo' 5185 5186 5187class TestWait(unittest.TestCase): 5188 5189 @classmethod 5190 def _child_test_wait(cls, w, slow): 5191 for i in range(10): 5192 if slow: 5193 time.sleep(random.random() * 0.100) 5194 w.send((i, os.getpid())) 5195 w.close() 5196 5197 def test_wait(self, slow=False): 5198 from multiprocessing.connection import wait 5199 readers = [] 5200 procs = [] 5201 messages = [] 5202 5203 for i in range(4): 5204 r, w = multiprocessing.Pipe(duplex=False) 5205 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 5206 p.daemon = True 5207 p.start() 5208 w.close() 5209 readers.append(r) 5210 procs.append(p) 5211 self.addCleanup(p.join) 5212 5213 while readers: 5214 for r in wait(readers): 5215 try: 5216 msg = r.recv() 5217 except EOFError: 5218 readers.remove(r) 5219 r.close() 5220 else: 5221 messages.append(msg) 5222 5223 messages.sort() 5224 expected = sorted((i, p.pid) for i in range(10) for p in procs) 5225 self.assertEqual(messages, expected) 5226 5227 @classmethod 5228 def _child_test_wait_socket(cls, address, slow): 5229 s = socket.socket() 5230 s.connect(address) 5231 for i in range(10): 5232 if slow: 5233 time.sleep(random.random() * 0.100) 5234 s.sendall(('%s\n' % i).encode('ascii')) 5235 s.close() 5236 5237 def test_wait_socket(self, slow=False): 5238 from multiprocessing.connection import wait 5239 l = socket.create_server((socket_helper.HOST, 0)) 5240 addr = l.getsockname() 5241 readers = [] 5242 procs = [] 5243 dic = {} 5244 5245 for i in range(4): 5246 p = multiprocessing.Process(target=self._child_test_wait_socket, 5247 args=(addr, slow)) 5248 p.daemon = True 5249 p.start() 5250 procs.append(p) 5251 self.addCleanup(p.join) 5252 5253 for i in range(4): 5254 r, _ = l.accept() 5255 readers.append(r) 5256 dic[r] = [] 5257 l.close() 5258 5259 while readers: 5260 for r in wait(readers): 5261 msg = r.recv(32) 5262 if not msg: 5263 readers.remove(r) 5264 r.close() 5265 else: 5266 dic[r].append(msg) 5267 5268 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 5269 for v in dic.values(): 5270 self.assertEqual(b''.join(v), expected) 5271 5272 def test_wait_slow(self): 5273 self.test_wait(True) 5274 5275 def test_wait_socket_slow(self): 5276 self.test_wait_socket(True) 5277 5278 @support.requires_resource('walltime') 5279 def test_wait_timeout(self): 5280 from multiprocessing.connection import wait 5281 5282 timeout = 5.0 # seconds 5283 a, b = multiprocessing.Pipe() 5284 5285 start = time.monotonic() 5286 res = wait([a, b], timeout) 5287 delta = time.monotonic() - start 5288 5289 self.assertEqual(res, []) 5290 self.assertGreater(delta, timeout - CLOCK_RES) 5291 5292 b.send(None) 5293 res = wait([a, b], 20) 5294 self.assertEqual(res, [a]) 5295 5296 @classmethod 5297 def signal_and_sleep(cls, sem, period): 5298 sem.release() 5299 time.sleep(period) 5300 5301 @support.requires_resource('walltime') 5302 def test_wait_integer(self): 5303 from multiprocessing.connection import wait 5304 5305 expected = 3 5306 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 5307 sem = multiprocessing.Semaphore(0) 5308 a, b = multiprocessing.Pipe() 5309 p = multiprocessing.Process(target=self.signal_and_sleep, 5310 args=(sem, expected)) 5311 5312 p.start() 5313 self.assertIsInstance(p.sentinel, int) 5314 self.assertTrue(sem.acquire(timeout=20)) 5315 5316 start = time.monotonic() 5317 res = wait([a, p.sentinel, b], expected + 20) 5318 delta = time.monotonic() - start 5319 5320 self.assertEqual(res, [p.sentinel]) 5321 self.assertLess(delta, expected + 2) 5322 self.assertGreater(delta, expected - 2) 5323 5324 a.send(None) 5325 5326 start = time.monotonic() 5327 res = wait([a, p.sentinel, b], 20) 5328 delta = time.monotonic() - start 5329 5330 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 5331 self.assertLess(delta, 0.4) 5332 5333 b.send(None) 5334 5335 start = time.monotonic() 5336 res = wait([a, p.sentinel, b], 20) 5337 delta = time.monotonic() - start 5338 5339 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 5340 self.assertLess(delta, 0.4) 5341 5342 p.terminate() 5343 p.join() 5344 5345 def test_neg_timeout(self): 5346 from multiprocessing.connection import wait 5347 a, b = multiprocessing.Pipe() 5348 t = time.monotonic() 5349 res = wait([a], timeout=-1) 5350 t = time.monotonic() - t 5351 self.assertEqual(res, []) 5352 self.assertLess(t, 1) 5353 a.close() 5354 b.close() 5355 5356# 5357# Issue 14151: Test invalid family on invalid environment 5358# 5359 5360class TestInvalidFamily(unittest.TestCase): 5361 5362 @unittest.skipIf(WIN32, "skipped on Windows") 5363 def test_invalid_family(self): 5364 with self.assertRaises(ValueError): 5365 multiprocessing.connection.Listener(r'\\.\test') 5366 5367 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 5368 def test_invalid_family_win32(self): 5369 with self.assertRaises(ValueError): 5370 multiprocessing.connection.Listener('/var/test.pipe') 5371 5372# 5373# Issue 12098: check sys.flags of child matches that for parent 5374# 5375 5376class TestFlags(unittest.TestCase): 5377 @classmethod 5378 def run_in_grandchild(cls, conn): 5379 conn.send(tuple(sys.flags)) 5380 5381 @classmethod 5382 def run_in_child(cls, start_method): 5383 import json 5384 mp = multiprocessing.get_context(start_method) 5385 r, w = mp.Pipe(duplex=False) 5386 p = mp.Process(target=cls.run_in_grandchild, args=(w,)) 5387 with warnings.catch_warnings(category=DeprecationWarning): 5388 p.start() 5389 grandchild_flags = r.recv() 5390 p.join() 5391 r.close() 5392 w.close() 5393 flags = (tuple(sys.flags), grandchild_flags) 5394 print(json.dumps(flags)) 5395 5396 def test_flags(self): 5397 import json 5398 # start child process using unusual flags 5399 prog = ( 5400 'from test._test_multiprocessing import TestFlags; ' 5401 f'TestFlags.run_in_child({multiprocessing.get_start_method()!r})' 5402 ) 5403 data = subprocess.check_output( 5404 [sys.executable, '-E', '-S', '-O', '-c', prog]) 5405 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 5406 self.assertEqual(child_flags, grandchild_flags) 5407 5408# 5409# Test interaction with socket timeouts - see Issue #6056 5410# 5411 5412class TestTimeouts(unittest.TestCase): 5413 @classmethod 5414 def _test_timeout(cls, child, address): 5415 time.sleep(1) 5416 child.send(123) 5417 child.close() 5418 conn = multiprocessing.connection.Client(address) 5419 conn.send(456) 5420 conn.close() 5421 5422 def test_timeout(self): 5423 old_timeout = socket.getdefaulttimeout() 5424 try: 5425 socket.setdefaulttimeout(0.1) 5426 parent, child = multiprocessing.Pipe(duplex=True) 5427 l = multiprocessing.connection.Listener(family='AF_INET') 5428 p = multiprocessing.Process(target=self._test_timeout, 5429 args=(child, l.address)) 5430 p.start() 5431 child.close() 5432 self.assertEqual(parent.recv(), 123) 5433 parent.close() 5434 conn = l.accept() 5435 self.assertEqual(conn.recv(), 456) 5436 conn.close() 5437 l.close() 5438 join_process(p) 5439 finally: 5440 socket.setdefaulttimeout(old_timeout) 5441 5442# 5443# Test what happens with no "if __name__ == '__main__'" 5444# 5445 5446class TestNoForkBomb(unittest.TestCase): 5447 def test_noforkbomb(self): 5448 sm = multiprocessing.get_start_method() 5449 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 5450 if sm != 'fork': 5451 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 5452 self.assertEqual(out, b'') 5453 self.assertIn(b'RuntimeError', err) 5454 else: 5455 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 5456 self.assertEqual(out.rstrip(), b'123') 5457 self.assertEqual(err, b'') 5458 5459# 5460# Issue #17555: ForkAwareThreadLock 5461# 5462 5463class TestForkAwareThreadLock(unittest.TestCase): 5464 # We recursively start processes. Issue #17555 meant that the 5465 # after fork registry would get duplicate entries for the same 5466 # lock. The size of the registry at generation n was ~2**n. 5467 5468 @classmethod 5469 def child(cls, n, conn): 5470 if n > 1: 5471 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 5472 p.start() 5473 conn.close() 5474 join_process(p) 5475 else: 5476 conn.send(len(util._afterfork_registry)) 5477 conn.close() 5478 5479 def test_lock(self): 5480 r, w = multiprocessing.Pipe(False) 5481 l = util.ForkAwareThreadLock() 5482 old_size = len(util._afterfork_registry) 5483 p = multiprocessing.Process(target=self.child, args=(5, w)) 5484 p.start() 5485 w.close() 5486 new_size = r.recv() 5487 join_process(p) 5488 self.assertLessEqual(new_size, old_size) 5489 5490# 5491# Check that non-forked child processes do not inherit unneeded fds/handles 5492# 5493 5494class TestCloseFds(unittest.TestCase): 5495 5496 def get_high_socket_fd(self): 5497 if WIN32: 5498 # The child process will not have any socket handles, so 5499 # calling socket.fromfd() should produce WSAENOTSOCK even 5500 # if there is a handle of the same number. 5501 return socket.socket().detach() 5502 else: 5503 # We want to produce a socket with an fd high enough that a 5504 # freshly created child process will not have any fds as high. 5505 fd = socket.socket().detach() 5506 to_close = [] 5507 while fd < 50: 5508 to_close.append(fd) 5509 fd = os.dup(fd) 5510 for x in to_close: 5511 os.close(x) 5512 return fd 5513 5514 def close(self, fd): 5515 if WIN32: 5516 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 5517 else: 5518 os.close(fd) 5519 5520 @classmethod 5521 def _test_closefds(cls, conn, fd): 5522 try: 5523 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 5524 except Exception as e: 5525 conn.send(e) 5526 else: 5527 s.close() 5528 conn.send(None) 5529 5530 def test_closefd(self): 5531 if not HAS_REDUCTION: 5532 raise unittest.SkipTest('requires fd pickling') 5533 5534 reader, writer = multiprocessing.Pipe() 5535 fd = self.get_high_socket_fd() 5536 try: 5537 p = multiprocessing.Process(target=self._test_closefds, 5538 args=(writer, fd)) 5539 p.start() 5540 writer.close() 5541 e = reader.recv() 5542 join_process(p) 5543 finally: 5544 self.close(fd) 5545 writer.close() 5546 reader.close() 5547 5548 if multiprocessing.get_start_method() == 'fork': 5549 self.assertIs(e, None) 5550 else: 5551 WSAENOTSOCK = 10038 5552 self.assertIsInstance(e, OSError) 5553 self.assertTrue(e.errno == errno.EBADF or 5554 e.winerror == WSAENOTSOCK, e) 5555 5556# 5557# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 5558# 5559 5560class TestIgnoreEINTR(unittest.TestCase): 5561 5562 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 5563 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 5564 5565 @classmethod 5566 def _test_ignore(cls, conn): 5567 def handler(signum, frame): 5568 pass 5569 signal.signal(signal.SIGUSR1, handler) 5570 conn.send('ready') 5571 x = conn.recv() 5572 conn.send(x) 5573 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 5574 5575 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5576 def test_ignore(self): 5577 conn, child_conn = multiprocessing.Pipe() 5578 try: 5579 p = multiprocessing.Process(target=self._test_ignore, 5580 args=(child_conn,)) 5581 p.daemon = True 5582 p.start() 5583 child_conn.close() 5584 self.assertEqual(conn.recv(), 'ready') 5585 time.sleep(0.1) 5586 os.kill(p.pid, signal.SIGUSR1) 5587 time.sleep(0.1) 5588 conn.send(1234) 5589 self.assertEqual(conn.recv(), 1234) 5590 time.sleep(0.1) 5591 os.kill(p.pid, signal.SIGUSR1) 5592 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 5593 time.sleep(0.1) 5594 p.join() 5595 finally: 5596 conn.close() 5597 5598 @classmethod 5599 def _test_ignore_listener(cls, conn): 5600 def handler(signum, frame): 5601 pass 5602 signal.signal(signal.SIGUSR1, handler) 5603 with multiprocessing.connection.Listener() as l: 5604 conn.send(l.address) 5605 a = l.accept() 5606 a.send('welcome') 5607 5608 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5609 def test_ignore_listener(self): 5610 conn, child_conn = multiprocessing.Pipe() 5611 try: 5612 p = multiprocessing.Process(target=self._test_ignore_listener, 5613 args=(child_conn,)) 5614 p.daemon = True 5615 p.start() 5616 child_conn.close() 5617 address = conn.recv() 5618 time.sleep(0.1) 5619 os.kill(p.pid, signal.SIGUSR1) 5620 time.sleep(0.1) 5621 client = multiprocessing.connection.Client(address) 5622 self.assertEqual(client.recv(), 'welcome') 5623 p.join() 5624 finally: 5625 conn.close() 5626 5627class TestStartMethod(unittest.TestCase): 5628 @classmethod 5629 def _check_context(cls, conn): 5630 conn.send(multiprocessing.get_start_method()) 5631 5632 def check_context(self, ctx): 5633 r, w = ctx.Pipe(duplex=False) 5634 p = ctx.Process(target=self._check_context, args=(w,)) 5635 p.start() 5636 w.close() 5637 child_method = r.recv() 5638 r.close() 5639 p.join() 5640 self.assertEqual(child_method, ctx.get_start_method()) 5641 5642 def test_context(self): 5643 for method in ('fork', 'spawn', 'forkserver'): 5644 try: 5645 ctx = multiprocessing.get_context(method) 5646 except ValueError: 5647 continue 5648 self.assertEqual(ctx.get_start_method(), method) 5649 self.assertIs(ctx.get_context(), ctx) 5650 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 5651 self.assertRaises(ValueError, ctx.set_start_method, None) 5652 self.check_context(ctx) 5653 5654 def test_context_check_module_types(self): 5655 try: 5656 ctx = multiprocessing.get_context('forkserver') 5657 except ValueError: 5658 raise unittest.SkipTest('forkserver should be available') 5659 with self.assertRaisesRegex(TypeError, 'module_names must be a list of strings'): 5660 ctx.set_forkserver_preload([1, 2, 3]) 5661 5662 def test_set_get(self): 5663 multiprocessing.set_forkserver_preload(PRELOAD) 5664 count = 0 5665 old_method = multiprocessing.get_start_method() 5666 try: 5667 for method in ('fork', 'spawn', 'forkserver'): 5668 try: 5669 multiprocessing.set_start_method(method, force=True) 5670 except ValueError: 5671 continue 5672 self.assertEqual(multiprocessing.get_start_method(), method) 5673 ctx = multiprocessing.get_context() 5674 self.assertEqual(ctx.get_start_method(), method) 5675 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 5676 self.assertTrue( 5677 ctx.Process.__name__.lower().startswith(method)) 5678 self.check_context(multiprocessing) 5679 count += 1 5680 finally: 5681 multiprocessing.set_start_method(old_method, force=True) 5682 self.assertGreaterEqual(count, 1) 5683 5684 def test_get_all(self): 5685 methods = multiprocessing.get_all_start_methods() 5686 if sys.platform == 'win32': 5687 self.assertEqual(methods, ['spawn']) 5688 else: 5689 self.assertTrue(methods == ['fork', 'spawn'] or 5690 methods == ['spawn', 'fork'] or 5691 methods == ['fork', 'spawn', 'forkserver'] or 5692 methods == ['spawn', 'fork', 'forkserver']) 5693 5694 def test_preload_resources(self): 5695 if multiprocessing.get_start_method() != 'forkserver': 5696 self.skipTest("test only relevant for 'forkserver' method") 5697 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 5698 rc, out, err = test.support.script_helper.assert_python_ok(name) 5699 out = out.decode() 5700 err = err.decode() 5701 if out.rstrip() != 'ok' or err != '': 5702 print(out) 5703 print(err) 5704 self.fail("failed spawning forkserver or grandchild") 5705 5706 @unittest.skipIf(sys.platform == "win32", 5707 "Only Spawn on windows so no risk of mixing") 5708 @only_run_in_spawn_testsuite("avoids redundant testing.") 5709 def test_mixed_startmethod(self): 5710 # Fork-based locks cannot be used with spawned process 5711 for process_method in ["spawn", "forkserver"]: 5712 queue = multiprocessing.get_context("fork").Queue() 5713 process_ctx = multiprocessing.get_context(process_method) 5714 p = process_ctx.Process(target=close_queue, args=(queue,)) 5715 err_msg = "A SemLock created in a fork" 5716 with self.assertRaisesRegex(RuntimeError, err_msg): 5717 p.start() 5718 5719 # non-fork-based locks can be used with all other start methods 5720 for queue_method in ["spawn", "forkserver"]: 5721 for process_method in multiprocessing.get_all_start_methods(): 5722 queue = multiprocessing.get_context(queue_method).Queue() 5723 process_ctx = multiprocessing.get_context(process_method) 5724 p = process_ctx.Process(target=close_queue, args=(queue,)) 5725 p.start() 5726 p.join() 5727 5728 @classmethod 5729 def _put_one_in_queue(cls, queue): 5730 queue.put(1) 5731 5732 @classmethod 5733 def _put_two_and_nest_once(cls, queue): 5734 queue.put(2) 5735 process = multiprocessing.Process(target=cls._put_one_in_queue, args=(queue,)) 5736 process.start() 5737 process.join() 5738 5739 def test_nested_startmethod(self): 5740 # gh-108520: Regression test to ensure that child process can send its 5741 # arguments to another process 5742 queue = multiprocessing.Queue() 5743 5744 process = multiprocessing.Process(target=self._put_two_and_nest_once, args=(queue,)) 5745 process.start() 5746 process.join() 5747 5748 results = [] 5749 while not queue.empty(): 5750 results.append(queue.get()) 5751 5752 # gh-109706: queue.put(1) can write into the queue before queue.put(2), 5753 # there is no synchronization in the test. 5754 self.assertSetEqual(set(results), set([2, 1])) 5755 5756 5757@unittest.skipIf(sys.platform == "win32", 5758 "test semantics don't make sense on Windows") 5759class TestResourceTracker(unittest.TestCase): 5760 5761 def test_resource_tracker(self): 5762 # 5763 # Check that killing process does not leak named semaphores 5764 # 5765 cmd = '''if 1: 5766 import time, os 5767 import multiprocessing as mp 5768 from multiprocessing import resource_tracker 5769 from multiprocessing.shared_memory import SharedMemory 5770 5771 mp.set_start_method("spawn") 5772 5773 5774 def create_and_register_resource(rtype): 5775 if rtype == "semaphore": 5776 lock = mp.Lock() 5777 return lock, lock._semlock.name 5778 elif rtype == "shared_memory": 5779 sm = SharedMemory(create=True, size=10) 5780 return sm, sm._name 5781 else: 5782 raise ValueError( 5783 "Resource type {{}} not understood".format(rtype)) 5784 5785 5786 resource1, rname1 = create_and_register_resource("{rtype}") 5787 resource2, rname2 = create_and_register_resource("{rtype}") 5788 5789 os.write({w}, rname1.encode("ascii") + b"\\n") 5790 os.write({w}, rname2.encode("ascii") + b"\\n") 5791 5792 time.sleep(10) 5793 ''' 5794 for rtype in resource_tracker._CLEANUP_FUNCS: 5795 with self.subTest(rtype=rtype): 5796 if rtype in ("noop", "dummy"): 5797 # Artefact resource type used by the resource_tracker 5798 # or tests 5799 continue 5800 r, w = os.pipe() 5801 p = subprocess.Popen([sys.executable, 5802 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5803 pass_fds=[w], 5804 stderr=subprocess.PIPE) 5805 os.close(w) 5806 with open(r, 'rb', closefd=True) as f: 5807 name1 = f.readline().rstrip().decode('ascii') 5808 name2 = f.readline().rstrip().decode('ascii') 5809 _resource_unlink(name1, rtype) 5810 p.terminate() 5811 p.wait() 5812 5813 err_msg = (f"A {rtype} resource was leaked after a process was " 5814 f"abruptly terminated") 5815 for _ in support.sleeping_retry(support.SHORT_TIMEOUT, 5816 err_msg): 5817 try: 5818 _resource_unlink(name2, rtype) 5819 except OSError as e: 5820 # docs say it should be ENOENT, but OSX seems to give 5821 # EINVAL 5822 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5823 break 5824 5825 err = p.stderr.read().decode('utf-8') 5826 p.stderr.close() 5827 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5828 'objects'.format( 5829 rtype)) 5830 self.assertRegex(err, expected) 5831 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5832 5833 def check_resource_tracker_death(self, signum, should_die): 5834 # bpo-31310: if the semaphore tracker process has died, it should 5835 # be restarted implicitly. 5836 from multiprocessing.resource_tracker import _resource_tracker 5837 pid = _resource_tracker._pid 5838 if pid is not None: 5839 os.kill(pid, signal.SIGKILL) 5840 support.wait_process(pid, exitcode=-signal.SIGKILL) 5841 with warnings.catch_warnings(): 5842 warnings.simplefilter("ignore") 5843 _resource_tracker.ensure_running() 5844 pid = _resource_tracker._pid 5845 5846 os.kill(pid, signum) 5847 time.sleep(1.0) # give it time to die 5848 5849 ctx = multiprocessing.get_context("spawn") 5850 with warnings.catch_warnings(record=True) as all_warn: 5851 warnings.simplefilter("always") 5852 sem = ctx.Semaphore() 5853 sem.acquire() 5854 sem.release() 5855 wr = weakref.ref(sem) 5856 # ensure `sem` gets collected, which triggers communication with 5857 # the semaphore tracker 5858 del sem 5859 gc.collect() 5860 self.assertIsNone(wr()) 5861 if should_die: 5862 self.assertEqual(len(all_warn), 1) 5863 the_warn = all_warn[0] 5864 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5865 self.assertTrue("resource_tracker: process died" 5866 in str(the_warn.message)) 5867 else: 5868 self.assertEqual(len(all_warn), 0) 5869 5870 def test_resource_tracker_sigint(self): 5871 # Catchable signal (ignored by semaphore tracker) 5872 self.check_resource_tracker_death(signal.SIGINT, False) 5873 5874 def test_resource_tracker_sigterm(self): 5875 # Catchable signal (ignored by semaphore tracker) 5876 self.check_resource_tracker_death(signal.SIGTERM, False) 5877 5878 @unittest.skipIf(sys.platform.startswith("netbsd"), 5879 "gh-125620: Skip on NetBSD due to long wait for SIGKILL process termination.") 5880 def test_resource_tracker_sigkill(self): 5881 # Uncatchable signal. 5882 self.check_resource_tracker_death(signal.SIGKILL, True) 5883 5884 @staticmethod 5885 def _is_resource_tracker_reused(conn, pid): 5886 from multiprocessing.resource_tracker import _resource_tracker 5887 _resource_tracker.ensure_running() 5888 # The pid should be None in the child process, expect for the fork 5889 # context. It should not be a new value. 5890 reused = _resource_tracker._pid in (None, pid) 5891 reused &= _resource_tracker._check_alive() 5892 conn.send(reused) 5893 5894 def test_resource_tracker_reused(self): 5895 from multiprocessing.resource_tracker import _resource_tracker 5896 _resource_tracker.ensure_running() 5897 pid = _resource_tracker._pid 5898 5899 r, w = multiprocessing.Pipe(duplex=False) 5900 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5901 args=(w, pid)) 5902 p.start() 5903 is_resource_tracker_reused = r.recv() 5904 5905 # Clean up 5906 p.join() 5907 w.close() 5908 r.close() 5909 5910 self.assertTrue(is_resource_tracker_reused) 5911 5912 def test_too_long_name_resource(self): 5913 # gh-96819: Resource names that will make the length of a write to a pipe 5914 # greater than PIPE_BUF are not allowed 5915 rtype = "shared_memory" 5916 too_long_name_resource = "a" * (512 - len(rtype)) 5917 with self.assertRaises(ValueError): 5918 resource_tracker.register(too_long_name_resource, rtype) 5919 5920 def _test_resource_tracker_leak_resources(self, cleanup): 5921 # We use a separate instance for testing, since the main global 5922 # _resource_tracker may be used to watch test infrastructure. 5923 from multiprocessing.resource_tracker import ResourceTracker 5924 tracker = ResourceTracker() 5925 tracker.ensure_running() 5926 self.assertTrue(tracker._check_alive()) 5927 5928 self.assertIsNone(tracker._exitcode) 5929 tracker.register('somename', 'dummy') 5930 if cleanup: 5931 tracker.unregister('somename', 'dummy') 5932 expected_exit_code = 0 5933 else: 5934 expected_exit_code = 1 5935 5936 self.assertTrue(tracker._check_alive()) 5937 self.assertIsNone(tracker._exitcode) 5938 tracker._stop() 5939 self.assertEqual(tracker._exitcode, expected_exit_code) 5940 5941 def test_resource_tracker_exit_code(self): 5942 """ 5943 Test the exit code of the resource tracker. 5944 5945 If no leaked resources were found, exit code should be 0, otherwise 1 5946 """ 5947 for cleanup in [True, False]: 5948 with self.subTest(cleanup=cleanup): 5949 self._test_resource_tracker_leak_resources( 5950 cleanup=cleanup, 5951 ) 5952 5953class TestSimpleQueue(unittest.TestCase): 5954 5955 @classmethod 5956 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5957 child_can_start.wait() 5958 # issue 30301, could fail under spawn and forkserver 5959 try: 5960 queue.put(queue.empty()) 5961 queue.put(queue.empty()) 5962 finally: 5963 parent_can_continue.set() 5964 5965 def test_empty_exceptions(self): 5966 # Assert that checking emptiness of a closed queue raises 5967 # an OSError, independently of whether the queue was used 5968 # or not. This differs from Queue and JoinableQueue. 5969 q = multiprocessing.SimpleQueue() 5970 q.close() # close the pipe 5971 with self.assertRaisesRegex(OSError, 'is closed'): 5972 q.empty() 5973 5974 def test_empty(self): 5975 queue = multiprocessing.SimpleQueue() 5976 child_can_start = multiprocessing.Event() 5977 parent_can_continue = multiprocessing.Event() 5978 5979 proc = multiprocessing.Process( 5980 target=self._test_empty, 5981 args=(queue, child_can_start, parent_can_continue) 5982 ) 5983 proc.daemon = True 5984 proc.start() 5985 5986 self.assertTrue(queue.empty()) 5987 5988 child_can_start.set() 5989 parent_can_continue.wait() 5990 5991 self.assertFalse(queue.empty()) 5992 self.assertEqual(queue.get(), True) 5993 self.assertEqual(queue.get(), False) 5994 self.assertTrue(queue.empty()) 5995 5996 proc.join() 5997 5998 def test_close(self): 5999 queue = multiprocessing.SimpleQueue() 6000 queue.close() 6001 # closing a queue twice should not fail 6002 queue.close() 6003 6004 # Test specific to CPython since it tests private attributes 6005 @test.support.cpython_only 6006 def test_closed(self): 6007 queue = multiprocessing.SimpleQueue() 6008 queue.close() 6009 self.assertTrue(queue._reader.closed) 6010 self.assertTrue(queue._writer.closed) 6011 6012 6013class TestPoolNotLeakOnFailure(unittest.TestCase): 6014 6015 def test_release_unused_processes(self): 6016 # Issue #19675: During pool creation, if we can't create a process, 6017 # don't leak already created ones. 6018 will_fail_in = 3 6019 forked_processes = [] 6020 6021 class FailingForkProcess: 6022 def __init__(self, **kwargs): 6023 self.name = 'Fake Process' 6024 self.exitcode = None 6025 self.state = None 6026 forked_processes.append(self) 6027 6028 def start(self): 6029 nonlocal will_fail_in 6030 if will_fail_in <= 0: 6031 raise OSError("Manually induced OSError") 6032 will_fail_in -= 1 6033 self.state = 'started' 6034 6035 def terminate(self): 6036 self.state = 'stopping' 6037 6038 def join(self): 6039 if self.state == 'stopping': 6040 self.state = 'stopped' 6041 6042 def is_alive(self): 6043 return self.state == 'started' or self.state == 'stopping' 6044 6045 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 6046 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 6047 Process=FailingForkProcess)) 6048 p.close() 6049 p.join() 6050 self.assertFalse( 6051 any(process.is_alive() for process in forked_processes)) 6052 6053 6054@hashlib_helper.requires_hashdigest('sha256') 6055class TestSyncManagerTypes(unittest.TestCase): 6056 """Test all the types which can be shared between a parent and a 6057 child process by using a manager which acts as an intermediary 6058 between them. 6059 6060 In the following unit-tests the base type is created in the parent 6061 process, the @classmethod represents the worker process and the 6062 shared object is readable and editable between the two. 6063 6064 # The child. 6065 @classmethod 6066 def _test_list(cls, obj): 6067 assert obj[0] == 5 6068 assert obj.append(6) 6069 6070 # The parent. 6071 def test_list(self): 6072 o = self.manager.list() 6073 o.append(5) 6074 self.run_worker(self._test_list, o) 6075 assert o[1] == 6 6076 """ 6077 manager_class = multiprocessing.managers.SyncManager 6078 6079 def setUp(self): 6080 self.manager = self.manager_class() 6081 self.manager.start() 6082 self.proc = None 6083 6084 def tearDown(self): 6085 if self.proc is not None and self.proc.is_alive(): 6086 self.proc.terminate() 6087 self.proc.join() 6088 self.manager.shutdown() 6089 self.manager = None 6090 self.proc = None 6091 6092 @classmethod 6093 def setUpClass(cls): 6094 support.reap_children() 6095 6096 tearDownClass = setUpClass 6097 6098 def wait_proc_exit(self): 6099 # Only the manager process should be returned by active_children() 6100 # but this can take a bit on slow machines, so wait a few seconds 6101 # if there are other children too (see #17395). 6102 join_process(self.proc) 6103 6104 timeout = WAIT_ACTIVE_CHILDREN_TIMEOUT 6105 start_time = time.monotonic() 6106 for _ in support.sleeping_retry(timeout, error=False): 6107 if len(multiprocessing.active_children()) <= 1: 6108 break 6109 else: 6110 dt = time.monotonic() - start_time 6111 support.environment_altered = True 6112 support.print_warning(f"multiprocessing.Manager still has " 6113 f"{multiprocessing.active_children()} " 6114 f"active children after {dt:.1f} seconds") 6115 6116 def run_worker(self, worker, obj): 6117 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 6118 self.proc.daemon = True 6119 self.proc.start() 6120 self.wait_proc_exit() 6121 self.assertEqual(self.proc.exitcode, 0) 6122 6123 @classmethod 6124 def _test_event(cls, obj): 6125 assert obj.is_set() 6126 obj.wait() 6127 obj.clear() 6128 obj.wait(0.001) 6129 6130 def test_event(self): 6131 o = self.manager.Event() 6132 o.set() 6133 self.run_worker(self._test_event, o) 6134 assert not o.is_set() 6135 o.wait(0.001) 6136 6137 @classmethod 6138 def _test_lock(cls, obj): 6139 obj.acquire() 6140 6141 def test_lock(self, lname="Lock"): 6142 o = getattr(self.manager, lname)() 6143 self.run_worker(self._test_lock, o) 6144 o.release() 6145 self.assertRaises(RuntimeError, o.release) # already released 6146 6147 @classmethod 6148 def _test_rlock(cls, obj): 6149 obj.acquire() 6150 obj.release() 6151 6152 def test_rlock(self, lname="Lock"): 6153 o = getattr(self.manager, lname)() 6154 self.run_worker(self._test_rlock, o) 6155 6156 @classmethod 6157 def _test_semaphore(cls, obj): 6158 obj.acquire() 6159 6160 def test_semaphore(self, sname="Semaphore"): 6161 o = getattr(self.manager, sname)() 6162 self.run_worker(self._test_semaphore, o) 6163 o.release() 6164 6165 def test_bounded_semaphore(self): 6166 self.test_semaphore(sname="BoundedSemaphore") 6167 6168 @classmethod 6169 def _test_condition(cls, obj): 6170 obj.acquire() 6171 obj.release() 6172 6173 def test_condition(self): 6174 o = self.manager.Condition() 6175 self.run_worker(self._test_condition, o) 6176 6177 @classmethod 6178 def _test_barrier(cls, obj): 6179 assert obj.parties == 5 6180 obj.reset() 6181 6182 def test_barrier(self): 6183 o = self.manager.Barrier(5) 6184 self.run_worker(self._test_barrier, o) 6185 6186 @classmethod 6187 def _test_pool(cls, obj): 6188 # TODO: fix https://bugs.python.org/issue35919 6189 with obj: 6190 pass 6191 6192 def test_pool(self): 6193 o = self.manager.Pool(processes=4) 6194 self.run_worker(self._test_pool, o) 6195 6196 @classmethod 6197 def _test_queue(cls, obj): 6198 assert obj.qsize() == 2 6199 assert obj.full() 6200 assert not obj.empty() 6201 assert obj.get() == 5 6202 assert not obj.empty() 6203 assert obj.get() == 6 6204 assert obj.empty() 6205 6206 def test_queue(self, qname="Queue"): 6207 o = getattr(self.manager, qname)(2) 6208 o.put(5) 6209 o.put(6) 6210 self.run_worker(self._test_queue, o) 6211 assert o.empty() 6212 assert not o.full() 6213 6214 def test_joinable_queue(self): 6215 self.test_queue("JoinableQueue") 6216 6217 @classmethod 6218 def _test_list(cls, obj): 6219 case = unittest.TestCase() 6220 case.assertEqual(obj[0], 5) 6221 case.assertEqual(obj.count(5), 1) 6222 case.assertEqual(obj.index(5), 0) 6223 obj.sort() 6224 obj.reverse() 6225 for x in obj: 6226 pass 6227 case.assertEqual(len(obj), 1) 6228 case.assertEqual(obj.pop(0), 5) 6229 6230 def test_list(self): 6231 o = self.manager.list() 6232 o.append(5) 6233 self.run_worker(self._test_list, o) 6234 self.assertIsNotNone(o) 6235 self.assertEqual(len(o), 0) 6236 6237 @classmethod 6238 def _test_dict(cls, obj): 6239 case = unittest.TestCase() 6240 case.assertEqual(len(obj), 1) 6241 case.assertEqual(obj['foo'], 5) 6242 case.assertEqual(obj.get('foo'), 5) 6243 case.assertListEqual(list(obj.items()), [('foo', 5)]) 6244 case.assertListEqual(list(obj.keys()), ['foo']) 6245 case.assertListEqual(list(obj.values()), [5]) 6246 case.assertDictEqual(obj.copy(), {'foo': 5}) 6247 case.assertTupleEqual(obj.popitem(), ('foo', 5)) 6248 6249 def test_dict(self): 6250 o = self.manager.dict() 6251 o['foo'] = 5 6252 self.run_worker(self._test_dict, o) 6253 self.assertIsNotNone(o) 6254 self.assertEqual(len(o), 0) 6255 6256 @classmethod 6257 def _test_value(cls, obj): 6258 case = unittest.TestCase() 6259 case.assertEqual(obj.value, 1) 6260 case.assertEqual(obj.get(), 1) 6261 obj.set(2) 6262 6263 def test_value(self): 6264 o = self.manager.Value('i', 1) 6265 self.run_worker(self._test_value, o) 6266 self.assertEqual(o.value, 2) 6267 self.assertEqual(o.get(), 2) 6268 6269 @classmethod 6270 def _test_array(cls, obj): 6271 case = unittest.TestCase() 6272 case.assertEqual(obj[0], 0) 6273 case.assertEqual(obj[1], 1) 6274 case.assertEqual(len(obj), 2) 6275 case.assertListEqual(list(obj), [0, 1]) 6276 6277 def test_array(self): 6278 o = self.manager.Array('i', [0, 1]) 6279 self.run_worker(self._test_array, o) 6280 6281 @classmethod 6282 def _test_namespace(cls, obj): 6283 case = unittest.TestCase() 6284 case.assertEqual(obj.x, 0) 6285 case.assertEqual(obj.y, 1) 6286 6287 def test_namespace(self): 6288 o = self.manager.Namespace() 6289 o.x = 0 6290 o.y = 1 6291 self.run_worker(self._test_namespace, o) 6292 6293 6294class TestNamedResource(unittest.TestCase): 6295 @only_run_in_spawn_testsuite("spawn specific test.") 6296 def test_global_named_resource_spawn(self): 6297 # 6298 # gh-90549: Check that global named resources in main module 6299 # will not leak by a subprocess, in spawn context. 6300 # 6301 testfn = os_helper.TESTFN 6302 self.addCleanup(os_helper.unlink, testfn) 6303 with open(testfn, 'w', encoding='utf-8') as f: 6304 f.write(textwrap.dedent('''\ 6305 import multiprocessing as mp 6306 ctx = mp.get_context('spawn') 6307 global_resource = ctx.Semaphore() 6308 def submain(): pass 6309 if __name__ == '__main__': 6310 p = ctx.Process(target=submain) 6311 p.start() 6312 p.join() 6313 ''')) 6314 rc, out, err = script_helper.assert_python_ok(testfn) 6315 # on error, err = 'UserWarning: resource_tracker: There appear to 6316 # be 1 leaked semaphore objects to clean up at shutdown' 6317 self.assertFalse(err, msg=err.decode('utf-8')) 6318 6319 6320class _TestAtExit(BaseTestCase): 6321 6322 ALLOWED_TYPES = ('processes',) 6323 6324 @classmethod 6325 def _write_file_at_exit(self, output_path): 6326 import atexit 6327 def exit_handler(): 6328 with open(output_path, 'w') as f: 6329 f.write("deadbeef") 6330 atexit.register(exit_handler) 6331 6332 def test_atexit(self): 6333 # gh-83856 6334 with os_helper.temp_dir() as temp_dir: 6335 output_path = os.path.join(temp_dir, 'output.txt') 6336 p = self.Process(target=self._write_file_at_exit, args=(output_path,)) 6337 p.start() 6338 p.join() 6339 with open(output_path) as f: 6340 self.assertEqual(f.read(), 'deadbeef') 6341 6342 6343class _TestSpawnedSysPath(BaseTestCase): 6344 """Test that sys.path is setup in forkserver and spawn processes.""" 6345 6346 ALLOWED_TYPES = {'processes'} 6347 # Not applicable to fork which inherits everything from the process as is. 6348 START_METHODS = {"forkserver", "spawn"} 6349 6350 def setUp(self): 6351 self._orig_sys_path = list(sys.path) 6352 self._temp_dir = tempfile.mkdtemp(prefix="test_sys_path-") 6353 self._mod_name = "unique_test_mod" 6354 module_path = os.path.join(self._temp_dir, f"{self._mod_name}.py") 6355 with open(module_path, "w", encoding="utf-8") as mod: 6356 mod.write("# A simple test module\n") 6357 sys.path[:] = [p for p in sys.path if p] # remove any existing ""s 6358 sys.path.insert(0, self._temp_dir) 6359 sys.path.insert(0, "") # Replaced with an abspath in child. 6360 self.assertIn(self.start_method, self.START_METHODS) 6361 self._ctx = multiprocessing.get_context(self.start_method) 6362 6363 def tearDown(self): 6364 sys.path[:] = self._orig_sys_path 6365 shutil.rmtree(self._temp_dir, ignore_errors=True) 6366 6367 @staticmethod 6368 def enq_imported_module_names(queue): 6369 queue.put(tuple(sys.modules)) 6370 6371 def test_forkserver_preload_imports_sys_path(self): 6372 if self._ctx.get_start_method() != "forkserver": 6373 self.skipTest("forkserver specific test.") 6374 self.assertNotIn(self._mod_name, sys.modules) 6375 multiprocessing.forkserver._forkserver._stop() # Must be fresh. 6376 self._ctx.set_forkserver_preload( 6377 ["test.test_multiprocessing_forkserver", self._mod_name]) 6378 q = self._ctx.Queue() 6379 proc = self._ctx.Process( 6380 target=self.enq_imported_module_names, args=(q,)) 6381 proc.start() 6382 proc.join() 6383 child_imported_modules = q.get() 6384 q.close() 6385 self.assertIn(self._mod_name, child_imported_modules) 6386 6387 @staticmethod 6388 def enq_sys_path_and_import(queue, mod_name): 6389 queue.put(sys.path) 6390 try: 6391 importlib.import_module(mod_name) 6392 except ImportError as exc: 6393 queue.put(exc) 6394 else: 6395 queue.put(None) 6396 6397 def test_child_sys_path(self): 6398 q = self._ctx.Queue() 6399 proc = self._ctx.Process( 6400 target=self.enq_sys_path_and_import, args=(q, self._mod_name)) 6401 proc.start() 6402 proc.join() 6403 child_sys_path = q.get() 6404 import_error = q.get() 6405 q.close() 6406 self.assertNotIn("", child_sys_path) # replaced by an abspath 6407 self.assertIn(self._temp_dir, child_sys_path) # our addition 6408 # ignore the first element, it is the absolute "" replacement 6409 self.assertEqual(child_sys_path[1:], sys.path[1:]) 6410 self.assertIsNone(import_error, msg=f"child could not import {self._mod_name}") 6411 6412 6413class MiscTestCase(unittest.TestCase): 6414 def test__all__(self): 6415 # Just make sure names in not_exported are excluded 6416 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 6417 not_exported=['SUBDEBUG', 'SUBWARNING']) 6418 6419 @only_run_in_spawn_testsuite("avoids redundant testing.") 6420 def test_spawn_sys_executable_none_allows_import(self): 6421 # Regression test for a bug introduced in 6422 # https://github.com/python/cpython/issues/90876 that caused an 6423 # ImportError in multiprocessing when sys.executable was None. 6424 # This can be true in embedded environments. 6425 rc, out, err = script_helper.assert_python_ok( 6426 "-c", 6427 """if 1: 6428 import sys 6429 sys.executable = None 6430 assert "multiprocessing" not in sys.modules, "already imported!" 6431 import multiprocessing 6432 import multiprocessing.spawn # This should not fail\n""", 6433 ) 6434 self.assertEqual(rc, 0) 6435 self.assertFalse(err, msg=err.decode('utf-8')) 6436 6437 def test_large_pool(self): 6438 # 6439 # gh-89240: Check that large pools are always okay 6440 # 6441 testfn = os_helper.TESTFN 6442 self.addCleanup(os_helper.unlink, testfn) 6443 with open(testfn, 'w', encoding='utf-8') as f: 6444 f.write(textwrap.dedent('''\ 6445 import multiprocessing 6446 def f(x): return x*x 6447 if __name__ == '__main__': 6448 with multiprocessing.Pool(200) as p: 6449 print(sum(p.map(f, range(1000)))) 6450 ''')) 6451 rc, out, err = script_helper.assert_python_ok(testfn) 6452 self.assertEqual("332833500", out.decode('utf-8').strip()) 6453 self.assertFalse(err, msg=err.decode('utf-8')) 6454 6455 6456# 6457# Mixins 6458# 6459 6460class BaseMixin(object): 6461 @classmethod 6462 def setUpClass(cls): 6463 cls.dangling = (multiprocessing.process._dangling.copy(), 6464 threading._dangling.copy()) 6465 6466 @classmethod 6467 def tearDownClass(cls): 6468 # bpo-26762: Some multiprocessing objects like Pool create reference 6469 # cycles. Trigger a garbage collection to break these cycles. 6470 test.support.gc_collect() 6471 6472 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 6473 if processes: 6474 test.support.environment_altered = True 6475 support.print_warning(f'Dangling processes: {processes}') 6476 processes = None 6477 6478 threads = set(threading._dangling) - set(cls.dangling[1]) 6479 if threads: 6480 test.support.environment_altered = True 6481 support.print_warning(f'Dangling threads: {threads}') 6482 threads = None 6483 6484 6485class ProcessesMixin(BaseMixin): 6486 TYPE = 'processes' 6487 Process = multiprocessing.Process 6488 connection = multiprocessing.connection 6489 current_process = staticmethod(multiprocessing.current_process) 6490 parent_process = staticmethod(multiprocessing.parent_process) 6491 active_children = staticmethod(multiprocessing.active_children) 6492 set_executable = staticmethod(multiprocessing.set_executable) 6493 Pool = staticmethod(multiprocessing.Pool) 6494 Pipe = staticmethod(multiprocessing.Pipe) 6495 Queue = staticmethod(multiprocessing.Queue) 6496 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 6497 Lock = staticmethod(multiprocessing.Lock) 6498 RLock = staticmethod(multiprocessing.RLock) 6499 Semaphore = staticmethod(multiprocessing.Semaphore) 6500 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 6501 Condition = staticmethod(multiprocessing.Condition) 6502 Event = staticmethod(multiprocessing.Event) 6503 Barrier = staticmethod(multiprocessing.Barrier) 6504 Value = staticmethod(multiprocessing.Value) 6505 Array = staticmethod(multiprocessing.Array) 6506 RawValue = staticmethod(multiprocessing.RawValue) 6507 RawArray = staticmethod(multiprocessing.RawArray) 6508 6509 6510class ManagerMixin(BaseMixin): 6511 TYPE = 'manager' 6512 Process = multiprocessing.Process 6513 Queue = property(operator.attrgetter('manager.Queue')) 6514 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 6515 Lock = property(operator.attrgetter('manager.Lock')) 6516 RLock = property(operator.attrgetter('manager.RLock')) 6517 Semaphore = property(operator.attrgetter('manager.Semaphore')) 6518 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 6519 Condition = property(operator.attrgetter('manager.Condition')) 6520 Event = property(operator.attrgetter('manager.Event')) 6521 Barrier = property(operator.attrgetter('manager.Barrier')) 6522 Value = property(operator.attrgetter('manager.Value')) 6523 Array = property(operator.attrgetter('manager.Array')) 6524 list = property(operator.attrgetter('manager.list')) 6525 dict = property(operator.attrgetter('manager.dict')) 6526 Namespace = property(operator.attrgetter('manager.Namespace')) 6527 6528 @classmethod 6529 def Pool(cls, *args, **kwds): 6530 return cls.manager.Pool(*args, **kwds) 6531 6532 @classmethod 6533 def setUpClass(cls): 6534 super().setUpClass() 6535 cls.manager = multiprocessing.Manager() 6536 6537 @classmethod 6538 def tearDownClass(cls): 6539 # only the manager process should be returned by active_children() 6540 # but this can take a bit on slow machines, so wait a few seconds 6541 # if there are other children too (see #17395) 6542 timeout = WAIT_ACTIVE_CHILDREN_TIMEOUT 6543 start_time = time.monotonic() 6544 for _ in support.sleeping_retry(timeout, error=False): 6545 if len(multiprocessing.active_children()) <= 1: 6546 break 6547 else: 6548 dt = time.monotonic() - start_time 6549 support.environment_altered = True 6550 support.print_warning(f"multiprocessing.Manager still has " 6551 f"{multiprocessing.active_children()} " 6552 f"active children after {dt:.1f} seconds") 6553 6554 gc.collect() # do garbage collection 6555 if cls.manager._number_of_objects() != 0: 6556 # This is not really an error since some tests do not 6557 # ensure that all processes which hold a reference to a 6558 # managed object have been joined. 6559 test.support.environment_altered = True 6560 support.print_warning('Shared objects which still exist ' 6561 'at manager shutdown:') 6562 support.print_warning(cls.manager._debug_info()) 6563 cls.manager.shutdown() 6564 cls.manager.join() 6565 cls.manager = None 6566 6567 super().tearDownClass() 6568 6569 6570class ThreadsMixin(BaseMixin): 6571 TYPE = 'threads' 6572 Process = multiprocessing.dummy.Process 6573 connection = multiprocessing.dummy.connection 6574 current_process = staticmethod(multiprocessing.dummy.current_process) 6575 active_children = staticmethod(multiprocessing.dummy.active_children) 6576 Pool = staticmethod(multiprocessing.dummy.Pool) 6577 Pipe = staticmethod(multiprocessing.dummy.Pipe) 6578 Queue = staticmethod(multiprocessing.dummy.Queue) 6579 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 6580 Lock = staticmethod(multiprocessing.dummy.Lock) 6581 RLock = staticmethod(multiprocessing.dummy.RLock) 6582 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 6583 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 6584 Condition = staticmethod(multiprocessing.dummy.Condition) 6585 Event = staticmethod(multiprocessing.dummy.Event) 6586 Barrier = staticmethod(multiprocessing.dummy.Barrier) 6587 Value = staticmethod(multiprocessing.dummy.Value) 6588 Array = staticmethod(multiprocessing.dummy.Array) 6589 6590# 6591# Functions used to create test cases from the base ones in this module 6592# 6593 6594def install_tests_in_module_dict(remote_globs, start_method, 6595 only_type=None, exclude_types=False): 6596 __module__ = remote_globs['__name__'] 6597 local_globs = globals() 6598 ALL_TYPES = {'processes', 'threads', 'manager'} 6599 6600 for name, base in local_globs.items(): 6601 if not isinstance(base, type): 6602 continue 6603 if issubclass(base, BaseTestCase): 6604 if base is BaseTestCase: 6605 continue 6606 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 6607 if base.START_METHODS and start_method not in base.START_METHODS: 6608 continue # class not intended for this start method. 6609 for type_ in base.ALLOWED_TYPES: 6610 if only_type and type_ != only_type: 6611 continue 6612 if exclude_types: 6613 continue 6614 newname = 'With' + type_.capitalize() + name[1:] 6615 Mixin = local_globs[type_.capitalize() + 'Mixin'] 6616 class Temp(base, Mixin, unittest.TestCase): 6617 pass 6618 if type_ == 'manager': 6619 Temp = hashlib_helper.requires_hashdigest('sha256')(Temp) 6620 Temp.__name__ = Temp.__qualname__ = newname 6621 Temp.__module__ = __module__ 6622 Temp.start_method = start_method 6623 remote_globs[newname] = Temp 6624 elif issubclass(base, unittest.TestCase): 6625 if only_type: 6626 continue 6627 6628 class Temp(base, object): 6629 pass 6630 Temp.__name__ = Temp.__qualname__ = name 6631 Temp.__module__ = __module__ 6632 remote_globs[name] = Temp 6633 6634 dangling = [None, None] 6635 old_start_method = [None] 6636 6637 def setUpModule(): 6638 multiprocessing.set_forkserver_preload(PRELOAD) 6639 multiprocessing.process._cleanup() 6640 dangling[0] = multiprocessing.process._dangling.copy() 6641 dangling[1] = threading._dangling.copy() 6642 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 6643 try: 6644 multiprocessing.set_start_method(start_method, force=True) 6645 except ValueError: 6646 raise unittest.SkipTest(start_method + 6647 ' start method not supported') 6648 6649 if sys.platform.startswith("linux"): 6650 try: 6651 lock = multiprocessing.RLock() 6652 except OSError: 6653 raise unittest.SkipTest("OSError raises on RLock creation, " 6654 "see issue 3111!") 6655 check_enough_semaphores() 6656 util.get_temp_dir() # creates temp directory 6657 multiprocessing.get_logger().setLevel(LOG_LEVEL) 6658 6659 def tearDownModule(): 6660 need_sleep = False 6661 6662 # bpo-26762: Some multiprocessing objects like Pool create reference 6663 # cycles. Trigger a garbage collection to break these cycles. 6664 test.support.gc_collect() 6665 6666 multiprocessing.set_start_method(old_start_method[0], force=True) 6667 # pause a bit so we don't get warning about dangling threads/processes 6668 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 6669 if processes: 6670 need_sleep = True 6671 test.support.environment_altered = True 6672 support.print_warning(f'Dangling processes: {processes}') 6673 processes = None 6674 6675 threads = set(threading._dangling) - set(dangling[1]) 6676 if threads: 6677 need_sleep = True 6678 test.support.environment_altered = True 6679 support.print_warning(f'Dangling threads: {threads}') 6680 threads = None 6681 6682 # Sleep 500 ms to give time to child processes to complete. 6683 if need_sleep: 6684 time.sleep(0.5) 6685 6686 multiprocessing.util._cleanup_tests() 6687 6688 remote_globs['setUpModule'] = setUpModule 6689 remote_globs['tearDownModule'] = tearDownModule 6690 6691 6692@unittest.skipIf(not hasattr(_multiprocessing, 'SemLock'), 'SemLock not available') 6693@unittest.skipIf(sys.platform != "linux", "Linux only") 6694class SemLockTests(unittest.TestCase): 6695 6696 def test_semlock_subclass(self): 6697 class SemLock(_multiprocessing.SemLock): 6698 pass 6699 name = f'test_semlock_subclass-{os.getpid()}' 6700 s = SemLock(1, 0, 10, name, False) 6701 _multiprocessing.sem_unlink(name) 6702