1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import time 9import io 10import itertools 11import sys 12import os 13import gc 14import errno 15import signal 16import array 17import socket 18import random 19import logging 20import subprocess 21import struct 22import operator 23import pickle 24import weakref 25import warnings 26import test.support 27import test.support.script_helper 28from test import support 29 30 31# Skip tests if _multiprocessing wasn't built. 32_multiprocessing = test.support.import_module('_multiprocessing') 33# Skip tests if sem_open implementation is broken. 34support.skip_if_broken_multiprocessing_synchronize() 35import threading 36 37import multiprocessing.connection 38import multiprocessing.dummy 39import multiprocessing.heap 40import multiprocessing.managers 41import multiprocessing.pool 42import multiprocessing.queues 43 44from multiprocessing import util 45 46try: 47 from multiprocessing import reduction 48 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 49except ImportError: 50 HAS_REDUCTION = False 51 52try: 53 from multiprocessing.sharedctypes import Value, copy 54 HAS_SHAREDCTYPES = True 55except ImportError: 56 HAS_SHAREDCTYPES = False 57 58try: 59 from multiprocessing import shared_memory 60 HAS_SHMEM = True 61except ImportError: 62 HAS_SHMEM = False 63 64try: 65 import msvcrt 66except ImportError: 67 msvcrt = None 68 69# 70# 71# 72 73# Timeout to wait until a process completes 74TIMEOUT = 60.0 # seconds 75 76def latin(s): 77 return s.encode('latin') 78 79 80def close_queue(queue): 81 if isinstance(queue, multiprocessing.queues.Queue): 82 queue.close() 83 queue.join_thread() 84 85 86def join_process(process): 87 # Since multiprocessing.Process has the same API than threading.Thread 88 # (join() and is_alive(), the support function can be reused 89 support.join_thread(process, timeout=TIMEOUT) 90 91 92if os.name == "posix": 93 from multiprocessing import resource_tracker 94 95 def _resource_unlink(name, rtype): 96 resource_tracker._CLEANUP_FUNCS[rtype](name) 97 98 99# 100# Constants 101# 102 103LOG_LEVEL = util.SUBWARNING 104#LOG_LEVEL = logging.DEBUG 105 106DELTA = 0.1 107CHECK_TIMINGS = False # making true makes tests take a lot longer 108 # and can sometimes cause some non-serious 109 # failures because some calls block a bit 110 # longer than expected 111if CHECK_TIMINGS: 112 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 113else: 114 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 115 116HAVE_GETVALUE = not getattr(_multiprocessing, 117 'HAVE_BROKEN_SEM_GETVALUE', False) 118 119WIN32 = (sys.platform == "win32") 120 121from multiprocessing.connection import wait 122 123def wait_for_handle(handle, timeout): 124 if timeout is not None and timeout < 0.0: 125 timeout = None 126 return wait([handle], timeout) 127 128try: 129 MAXFD = os.sysconf("SC_OPEN_MAX") 130except: 131 MAXFD = 256 132 133# To speed up tests when using the forkserver, we can preload these: 134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 135 136# 137# Some tests require ctypes 138# 139 140try: 141 from ctypes import Structure, c_int, c_double, c_longlong 142except ImportError: 143 Structure = object 144 c_int = c_double = c_longlong = None 145 146 147def check_enough_semaphores(): 148 """Check that the system supports enough semaphores to run the test.""" 149 # minimum number of semaphores available according to POSIX 150 nsems_min = 256 151 try: 152 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 153 except (AttributeError, ValueError): 154 # sysconf not available or setting not available 155 return 156 if nsems == -1 or nsems >= nsems_min: 157 return 158 raise unittest.SkipTest("The OS doesn't support enough semaphores " 159 "to run the test (required: %d)." % nsems_min) 160 161 162# 163# Creates a wrapper for a function which records the time it takes to finish 164# 165 166class TimingWrapper(object): 167 168 def __init__(self, func): 169 self.func = func 170 self.elapsed = None 171 172 def __call__(self, *args, **kwds): 173 t = time.monotonic() 174 try: 175 return self.func(*args, **kwds) 176 finally: 177 self.elapsed = time.monotonic() - t 178 179# 180# Base class for test cases 181# 182 183class BaseTestCase(object): 184 185 ALLOWED_TYPES = ('processes', 'manager', 'threads') 186 187 def assertTimingAlmostEqual(self, a, b): 188 if CHECK_TIMINGS: 189 self.assertAlmostEqual(a, b, 1) 190 191 def assertReturnsIfImplemented(self, value, func, *args): 192 try: 193 res = func(*args) 194 except NotImplementedError: 195 pass 196 else: 197 return self.assertEqual(value, res) 198 199 # For the sanity of Windows users, rather than crashing or freezing in 200 # multiple ways. 201 def __reduce__(self, *args): 202 raise NotImplementedError("shouldn't try to pickle a test case") 203 204 __reduce_ex__ = __reduce__ 205 206# 207# Return the value of a semaphore 208# 209 210def get_value(self): 211 try: 212 return self.get_value() 213 except AttributeError: 214 try: 215 return self._Semaphore__value 216 except AttributeError: 217 try: 218 return self._value 219 except AttributeError: 220 raise NotImplementedError 221 222# 223# Testcases 224# 225 226class DummyCallable: 227 def __call__(self, q, c): 228 assert isinstance(c, DummyCallable) 229 q.put(5) 230 231 232class _TestProcess(BaseTestCase): 233 234 ALLOWED_TYPES = ('processes', 'threads') 235 236 def test_current(self): 237 if self.TYPE == 'threads': 238 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 239 240 current = self.current_process() 241 authkey = current.authkey 242 243 self.assertTrue(current.is_alive()) 244 self.assertTrue(not current.daemon) 245 self.assertIsInstance(authkey, bytes) 246 self.assertTrue(len(authkey) > 0) 247 self.assertEqual(current.ident, os.getpid()) 248 self.assertEqual(current.exitcode, None) 249 250 def test_daemon_argument(self): 251 if self.TYPE == "threads": 252 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 253 254 # By default uses the current process's daemon flag. 255 proc0 = self.Process(target=self._test) 256 self.assertEqual(proc0.daemon, self.current_process().daemon) 257 proc1 = self.Process(target=self._test, daemon=True) 258 self.assertTrue(proc1.daemon) 259 proc2 = self.Process(target=self._test, daemon=False) 260 self.assertFalse(proc2.daemon) 261 262 @classmethod 263 def _test(cls, q, *args, **kwds): 264 current = cls.current_process() 265 q.put(args) 266 q.put(kwds) 267 q.put(current.name) 268 if cls.TYPE != 'threads': 269 q.put(bytes(current.authkey)) 270 q.put(current.pid) 271 272 def test_parent_process_attributes(self): 273 if self.TYPE == "threads": 274 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 275 276 self.assertIsNone(self.parent_process()) 277 278 rconn, wconn = self.Pipe(duplex=False) 279 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 280 p.start() 281 p.join() 282 parent_pid, parent_name = rconn.recv() 283 self.assertEqual(parent_pid, self.current_process().pid) 284 self.assertEqual(parent_pid, os.getpid()) 285 self.assertEqual(parent_name, self.current_process().name) 286 287 @classmethod 288 def _test_send_parent_process(cls, wconn): 289 from multiprocessing.process import parent_process 290 wconn.send([parent_process().pid, parent_process().name]) 291 292 def test_parent_process(self): 293 if self.TYPE == "threads": 294 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 295 296 # Launch a child process. Make it launch a grandchild process. Kill the 297 # child process and make sure that the grandchild notices the death of 298 # its parent (a.k.a the child process). 299 rconn, wconn = self.Pipe(duplex=False) 300 p = self.Process( 301 target=self._test_create_grandchild_process, args=(wconn, )) 302 p.start() 303 304 if not rconn.poll(timeout=60): 305 raise AssertionError("Could not communicate with child process") 306 parent_process_status = rconn.recv() 307 self.assertEqual(parent_process_status, "alive") 308 309 p.terminate() 310 p.join() 311 312 if not rconn.poll(timeout=60): 313 raise AssertionError("Could not communicate with child process") 314 parent_process_status = rconn.recv() 315 self.assertEqual(parent_process_status, "not alive") 316 317 @classmethod 318 def _test_create_grandchild_process(cls, wconn): 319 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 320 p.start() 321 time.sleep(300) 322 323 @classmethod 324 def _test_report_parent_status(cls, wconn): 325 from multiprocessing.process import parent_process 326 wconn.send("alive" if parent_process().is_alive() else "not alive") 327 parent_process().join(timeout=5) 328 wconn.send("alive" if parent_process().is_alive() else "not alive") 329 330 def test_process(self): 331 q = self.Queue(1) 332 e = self.Event() 333 args = (q, 1, 2) 334 kwargs = {'hello':23, 'bye':2.54} 335 name = 'SomeProcess' 336 p = self.Process( 337 target=self._test, args=args, kwargs=kwargs, name=name 338 ) 339 p.daemon = True 340 current = self.current_process() 341 342 if self.TYPE != 'threads': 343 self.assertEqual(p.authkey, current.authkey) 344 self.assertEqual(p.is_alive(), False) 345 self.assertEqual(p.daemon, True) 346 self.assertNotIn(p, self.active_children()) 347 self.assertTrue(type(self.active_children()) is list) 348 self.assertEqual(p.exitcode, None) 349 350 p.start() 351 352 self.assertEqual(p.exitcode, None) 353 self.assertEqual(p.is_alive(), True) 354 self.assertIn(p, self.active_children()) 355 356 self.assertEqual(q.get(), args[1:]) 357 self.assertEqual(q.get(), kwargs) 358 self.assertEqual(q.get(), p.name) 359 if self.TYPE != 'threads': 360 self.assertEqual(q.get(), current.authkey) 361 self.assertEqual(q.get(), p.pid) 362 363 p.join() 364 365 self.assertEqual(p.exitcode, 0) 366 self.assertEqual(p.is_alive(), False) 367 self.assertNotIn(p, self.active_children()) 368 close_queue(q) 369 370 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 371 def test_process_mainthread_native_id(self): 372 if self.TYPE == 'threads': 373 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 374 375 current_mainthread_native_id = threading.main_thread().native_id 376 377 q = self.Queue(1) 378 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 379 p.start() 380 381 child_mainthread_native_id = q.get() 382 p.join() 383 close_queue(q) 384 385 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 386 387 @classmethod 388 def _test_process_mainthread_native_id(cls, q): 389 mainthread_native_id = threading.main_thread().native_id 390 q.put(mainthread_native_id) 391 392 @classmethod 393 def _sleep_some(cls): 394 time.sleep(100) 395 396 @classmethod 397 def _test_sleep(cls, delay): 398 time.sleep(delay) 399 400 def _kill_process(self, meth): 401 if self.TYPE == 'threads': 402 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 403 404 p = self.Process(target=self._sleep_some) 405 p.daemon = True 406 p.start() 407 408 self.assertEqual(p.is_alive(), True) 409 self.assertIn(p, self.active_children()) 410 self.assertEqual(p.exitcode, None) 411 412 join = TimingWrapper(p.join) 413 414 self.assertEqual(join(0), None) 415 self.assertTimingAlmostEqual(join.elapsed, 0.0) 416 self.assertEqual(p.is_alive(), True) 417 418 self.assertEqual(join(-1), None) 419 self.assertTimingAlmostEqual(join.elapsed, 0.0) 420 self.assertEqual(p.is_alive(), True) 421 422 # XXX maybe terminating too soon causes the problems on Gentoo... 423 time.sleep(1) 424 425 meth(p) 426 427 if hasattr(signal, 'alarm'): 428 # On the Gentoo buildbot waitpid() often seems to block forever. 429 # We use alarm() to interrupt it if it blocks for too long. 430 def handler(*args): 431 raise RuntimeError('join took too long: %s' % p) 432 old_handler = signal.signal(signal.SIGALRM, handler) 433 try: 434 signal.alarm(10) 435 self.assertEqual(join(), None) 436 finally: 437 signal.alarm(0) 438 signal.signal(signal.SIGALRM, old_handler) 439 else: 440 self.assertEqual(join(), None) 441 442 self.assertTimingAlmostEqual(join.elapsed, 0.0) 443 444 self.assertEqual(p.is_alive(), False) 445 self.assertNotIn(p, self.active_children()) 446 447 p.join() 448 449 return p.exitcode 450 451 def test_terminate(self): 452 exitcode = self._kill_process(multiprocessing.Process.terminate) 453 if os.name != 'nt': 454 self.assertEqual(exitcode, -signal.SIGTERM) 455 456 def test_kill(self): 457 exitcode = self._kill_process(multiprocessing.Process.kill) 458 if os.name != 'nt': 459 self.assertEqual(exitcode, -signal.SIGKILL) 460 461 def test_cpu_count(self): 462 try: 463 cpus = multiprocessing.cpu_count() 464 except NotImplementedError: 465 cpus = 1 466 self.assertTrue(type(cpus) is int) 467 self.assertTrue(cpus >= 1) 468 469 def test_active_children(self): 470 self.assertEqual(type(self.active_children()), list) 471 472 p = self.Process(target=time.sleep, args=(DELTA,)) 473 self.assertNotIn(p, self.active_children()) 474 475 p.daemon = True 476 p.start() 477 self.assertIn(p, self.active_children()) 478 479 p.join() 480 self.assertNotIn(p, self.active_children()) 481 482 @classmethod 483 def _test_recursion(cls, wconn, id): 484 wconn.send(id) 485 if len(id) < 2: 486 for i in range(2): 487 p = cls.Process( 488 target=cls._test_recursion, args=(wconn, id+[i]) 489 ) 490 p.start() 491 p.join() 492 493 def test_recursion(self): 494 rconn, wconn = self.Pipe(duplex=False) 495 self._test_recursion(wconn, []) 496 497 time.sleep(DELTA) 498 result = [] 499 while rconn.poll(): 500 result.append(rconn.recv()) 501 502 expected = [ 503 [], 504 [0], 505 [0, 0], 506 [0, 1], 507 [1], 508 [1, 0], 509 [1, 1] 510 ] 511 self.assertEqual(result, expected) 512 513 @classmethod 514 def _test_sentinel(cls, event): 515 event.wait(10.0) 516 517 def test_sentinel(self): 518 if self.TYPE == "threads": 519 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 520 event = self.Event() 521 p = self.Process(target=self._test_sentinel, args=(event,)) 522 with self.assertRaises(ValueError): 523 p.sentinel 524 p.start() 525 self.addCleanup(p.join) 526 sentinel = p.sentinel 527 self.assertIsInstance(sentinel, int) 528 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 529 event.set() 530 p.join() 531 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 532 533 @classmethod 534 def _test_close(cls, rc=0, q=None): 535 if q is not None: 536 q.get() 537 sys.exit(rc) 538 539 def test_close(self): 540 if self.TYPE == "threads": 541 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 542 q = self.Queue() 543 p = self.Process(target=self._test_close, kwargs={'q': q}) 544 p.daemon = True 545 p.start() 546 self.assertEqual(p.is_alive(), True) 547 # Child is still alive, cannot close 548 with self.assertRaises(ValueError): 549 p.close() 550 551 q.put(None) 552 p.join() 553 self.assertEqual(p.is_alive(), False) 554 self.assertEqual(p.exitcode, 0) 555 p.close() 556 with self.assertRaises(ValueError): 557 p.is_alive() 558 with self.assertRaises(ValueError): 559 p.join() 560 with self.assertRaises(ValueError): 561 p.terminate() 562 p.close() 563 564 wr = weakref.ref(p) 565 del p 566 gc.collect() 567 self.assertIs(wr(), None) 568 569 close_queue(q) 570 571 def test_many_processes(self): 572 if self.TYPE == 'threads': 573 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 574 575 sm = multiprocessing.get_start_method() 576 N = 5 if sm == 'spawn' else 100 577 578 # Try to overwhelm the forkserver loop with events 579 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 580 for i in range(N)] 581 for p in procs: 582 p.start() 583 for p in procs: 584 join_process(p) 585 for p in procs: 586 self.assertEqual(p.exitcode, 0) 587 588 procs = [self.Process(target=self._sleep_some) 589 for i in range(N)] 590 for p in procs: 591 p.start() 592 time.sleep(0.001) # let the children start... 593 for p in procs: 594 p.terminate() 595 for p in procs: 596 join_process(p) 597 if os.name != 'nt': 598 exitcodes = [-signal.SIGTERM] 599 if sys.platform == 'darwin': 600 # bpo-31510: On macOS, killing a freshly started process with 601 # SIGTERM sometimes kills the process with SIGKILL. 602 exitcodes.append(-signal.SIGKILL) 603 for p in procs: 604 self.assertIn(p.exitcode, exitcodes) 605 606 def test_lose_target_ref(self): 607 c = DummyCallable() 608 wr = weakref.ref(c) 609 q = self.Queue() 610 p = self.Process(target=c, args=(q, c)) 611 del c 612 p.start() 613 p.join() 614 self.assertIs(wr(), None) 615 self.assertEqual(q.get(), 5) 616 close_queue(q) 617 618 @classmethod 619 def _test_child_fd_inflation(self, evt, q): 620 q.put(test.support.fd_count()) 621 evt.wait() 622 623 def test_child_fd_inflation(self): 624 # Number of fds in child processes should not grow with the 625 # number of running children. 626 if self.TYPE == 'threads': 627 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 628 629 sm = multiprocessing.get_start_method() 630 if sm == 'fork': 631 # The fork method by design inherits all fds from the parent, 632 # trying to go against it is a lost battle 633 self.skipTest('test not appropriate for {}'.format(sm)) 634 635 N = 5 636 evt = self.Event() 637 q = self.Queue() 638 639 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 640 for i in range(N)] 641 for p in procs: 642 p.start() 643 644 try: 645 fd_counts = [q.get() for i in range(N)] 646 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 647 648 finally: 649 evt.set() 650 for p in procs: 651 p.join() 652 close_queue(q) 653 654 @classmethod 655 def _test_wait_for_threads(self, evt): 656 def func1(): 657 time.sleep(0.5) 658 evt.set() 659 660 def func2(): 661 time.sleep(20) 662 evt.clear() 663 664 threading.Thread(target=func1).start() 665 threading.Thread(target=func2, daemon=True).start() 666 667 def test_wait_for_threads(self): 668 # A child process should wait for non-daemonic threads to end 669 # before exiting 670 if self.TYPE == 'threads': 671 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 672 673 evt = self.Event() 674 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 675 proc.start() 676 proc.join() 677 self.assertTrue(evt.is_set()) 678 679 @classmethod 680 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 681 for stream_name, action in break_std_streams.items(): 682 if action == 'close': 683 stream = io.StringIO() 684 stream.close() 685 else: 686 assert action == 'remove' 687 stream = None 688 setattr(sys, stream_name, None) 689 evt.set() 690 691 def test_error_on_stdio_flush_1(self): 692 # Check that Process works with broken standard streams 693 streams = [io.StringIO(), None] 694 streams[0].close() 695 for stream_name in ('stdout', 'stderr'): 696 for stream in streams: 697 old_stream = getattr(sys, stream_name) 698 setattr(sys, stream_name, stream) 699 try: 700 evt = self.Event() 701 proc = self.Process(target=self._test_error_on_stdio_flush, 702 args=(evt,)) 703 proc.start() 704 proc.join() 705 self.assertTrue(evt.is_set()) 706 self.assertEqual(proc.exitcode, 0) 707 finally: 708 setattr(sys, stream_name, old_stream) 709 710 def test_error_on_stdio_flush_2(self): 711 # Same as test_error_on_stdio_flush_1(), but standard streams are 712 # broken by the child process 713 for stream_name in ('stdout', 'stderr'): 714 for action in ('close', 'remove'): 715 old_stream = getattr(sys, stream_name) 716 try: 717 evt = self.Event() 718 proc = self.Process(target=self._test_error_on_stdio_flush, 719 args=(evt, {stream_name: action})) 720 proc.start() 721 proc.join() 722 self.assertTrue(evt.is_set()) 723 self.assertEqual(proc.exitcode, 0) 724 finally: 725 setattr(sys, stream_name, old_stream) 726 727 @classmethod 728 def _sleep_and_set_event(self, evt, delay=0.0): 729 time.sleep(delay) 730 evt.set() 731 732 def check_forkserver_death(self, signum): 733 # bpo-31308: if the forkserver process has died, we should still 734 # be able to create and run new Process instances (the forkserver 735 # is implicitly restarted). 736 if self.TYPE == 'threads': 737 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 738 sm = multiprocessing.get_start_method() 739 if sm != 'forkserver': 740 # The fork method by design inherits all fds from the parent, 741 # trying to go against it is a lost battle 742 self.skipTest('test not appropriate for {}'.format(sm)) 743 744 from multiprocessing.forkserver import _forkserver 745 _forkserver.ensure_running() 746 747 # First process sleeps 500 ms 748 delay = 0.5 749 750 evt = self.Event() 751 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 752 proc.start() 753 754 pid = _forkserver._forkserver_pid 755 os.kill(pid, signum) 756 # give time to the fork server to die and time to proc to complete 757 time.sleep(delay * 2.0) 758 759 evt2 = self.Event() 760 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 761 proc2.start() 762 proc2.join() 763 self.assertTrue(evt2.is_set()) 764 self.assertEqual(proc2.exitcode, 0) 765 766 proc.join() 767 self.assertTrue(evt.is_set()) 768 self.assertIn(proc.exitcode, (0, 255)) 769 770 def test_forkserver_sigint(self): 771 # Catchable signal 772 self.check_forkserver_death(signal.SIGINT) 773 774 def test_forkserver_sigkill(self): 775 # Uncatchable signal 776 if os.name != 'nt': 777 self.check_forkserver_death(signal.SIGKILL) 778 779 780# 781# 782# 783 784class _UpperCaser(multiprocessing.Process): 785 786 def __init__(self): 787 multiprocessing.Process.__init__(self) 788 self.child_conn, self.parent_conn = multiprocessing.Pipe() 789 790 def run(self): 791 self.parent_conn.close() 792 for s in iter(self.child_conn.recv, None): 793 self.child_conn.send(s.upper()) 794 self.child_conn.close() 795 796 def submit(self, s): 797 assert type(s) is str 798 self.parent_conn.send(s) 799 return self.parent_conn.recv() 800 801 def stop(self): 802 self.parent_conn.send(None) 803 self.parent_conn.close() 804 self.child_conn.close() 805 806class _TestSubclassingProcess(BaseTestCase): 807 808 ALLOWED_TYPES = ('processes',) 809 810 def test_subclassing(self): 811 uppercaser = _UpperCaser() 812 uppercaser.daemon = True 813 uppercaser.start() 814 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 815 self.assertEqual(uppercaser.submit('world'), 'WORLD') 816 uppercaser.stop() 817 uppercaser.join() 818 819 def test_stderr_flush(self): 820 # sys.stderr is flushed at process shutdown (issue #13812) 821 if self.TYPE == "threads": 822 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 823 824 testfn = test.support.TESTFN 825 self.addCleanup(test.support.unlink, testfn) 826 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 827 proc.start() 828 proc.join() 829 with open(testfn, 'r') as f: 830 err = f.read() 831 # The whole traceback was printed 832 self.assertIn("ZeroDivisionError", err) 833 self.assertIn("test_multiprocessing.py", err) 834 self.assertIn("1/0 # MARKER", err) 835 836 @classmethod 837 def _test_stderr_flush(cls, testfn): 838 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 839 sys.stderr = open(fd, 'w', closefd=False) 840 1/0 # MARKER 841 842 843 @classmethod 844 def _test_sys_exit(cls, reason, testfn): 845 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 846 sys.stderr = open(fd, 'w', closefd=False) 847 sys.exit(reason) 848 849 def test_sys_exit(self): 850 # See Issue 13854 851 if self.TYPE == 'threads': 852 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 853 854 testfn = test.support.TESTFN 855 self.addCleanup(test.support.unlink, testfn) 856 857 for reason in ( 858 [1, 2, 3], 859 'ignore this', 860 ): 861 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 862 p.daemon = True 863 p.start() 864 join_process(p) 865 self.assertEqual(p.exitcode, 1) 866 867 with open(testfn, 'r') as f: 868 content = f.read() 869 self.assertEqual(content.rstrip(), str(reason)) 870 871 os.unlink(testfn) 872 873 for reason in (True, False, 8): 874 p = self.Process(target=sys.exit, args=(reason,)) 875 p.daemon = True 876 p.start() 877 join_process(p) 878 self.assertEqual(p.exitcode, reason) 879 880# 881# 882# 883 884def queue_empty(q): 885 if hasattr(q, 'empty'): 886 return q.empty() 887 else: 888 return q.qsize() == 0 889 890def queue_full(q, maxsize): 891 if hasattr(q, 'full'): 892 return q.full() 893 else: 894 return q.qsize() == maxsize 895 896 897class _TestQueue(BaseTestCase): 898 899 900 @classmethod 901 def _test_put(cls, queue, child_can_start, parent_can_continue): 902 child_can_start.wait() 903 for i in range(6): 904 queue.get() 905 parent_can_continue.set() 906 907 def test_put(self): 908 MAXSIZE = 6 909 queue = self.Queue(maxsize=MAXSIZE) 910 child_can_start = self.Event() 911 parent_can_continue = self.Event() 912 913 proc = self.Process( 914 target=self._test_put, 915 args=(queue, child_can_start, parent_can_continue) 916 ) 917 proc.daemon = True 918 proc.start() 919 920 self.assertEqual(queue_empty(queue), True) 921 self.assertEqual(queue_full(queue, MAXSIZE), False) 922 923 queue.put(1) 924 queue.put(2, True) 925 queue.put(3, True, None) 926 queue.put(4, False) 927 queue.put(5, False, None) 928 queue.put_nowait(6) 929 930 # the values may be in buffer but not yet in pipe so sleep a bit 931 time.sleep(DELTA) 932 933 self.assertEqual(queue_empty(queue), False) 934 self.assertEqual(queue_full(queue, MAXSIZE), True) 935 936 put = TimingWrapper(queue.put) 937 put_nowait = TimingWrapper(queue.put_nowait) 938 939 self.assertRaises(pyqueue.Full, put, 7, False) 940 self.assertTimingAlmostEqual(put.elapsed, 0) 941 942 self.assertRaises(pyqueue.Full, put, 7, False, None) 943 self.assertTimingAlmostEqual(put.elapsed, 0) 944 945 self.assertRaises(pyqueue.Full, put_nowait, 7) 946 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 947 948 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 949 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 950 951 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 952 self.assertTimingAlmostEqual(put.elapsed, 0) 953 954 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 955 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 956 957 child_can_start.set() 958 parent_can_continue.wait() 959 960 self.assertEqual(queue_empty(queue), True) 961 self.assertEqual(queue_full(queue, MAXSIZE), False) 962 963 proc.join() 964 close_queue(queue) 965 966 @classmethod 967 def _test_get(cls, queue, child_can_start, parent_can_continue): 968 child_can_start.wait() 969 #queue.put(1) 970 queue.put(2) 971 queue.put(3) 972 queue.put(4) 973 queue.put(5) 974 parent_can_continue.set() 975 976 def test_get(self): 977 queue = self.Queue() 978 child_can_start = self.Event() 979 parent_can_continue = self.Event() 980 981 proc = self.Process( 982 target=self._test_get, 983 args=(queue, child_can_start, parent_can_continue) 984 ) 985 proc.daemon = True 986 proc.start() 987 988 self.assertEqual(queue_empty(queue), True) 989 990 child_can_start.set() 991 parent_can_continue.wait() 992 993 time.sleep(DELTA) 994 self.assertEqual(queue_empty(queue), False) 995 996 # Hangs unexpectedly, remove for now 997 #self.assertEqual(queue.get(), 1) 998 self.assertEqual(queue.get(True, None), 2) 999 self.assertEqual(queue.get(True), 3) 1000 self.assertEqual(queue.get(timeout=1), 4) 1001 self.assertEqual(queue.get_nowait(), 5) 1002 1003 self.assertEqual(queue_empty(queue), True) 1004 1005 get = TimingWrapper(queue.get) 1006 get_nowait = TimingWrapper(queue.get_nowait) 1007 1008 self.assertRaises(pyqueue.Empty, get, False) 1009 self.assertTimingAlmostEqual(get.elapsed, 0) 1010 1011 self.assertRaises(pyqueue.Empty, get, False, None) 1012 self.assertTimingAlmostEqual(get.elapsed, 0) 1013 1014 self.assertRaises(pyqueue.Empty, get_nowait) 1015 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1016 1017 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1018 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1019 1020 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1021 self.assertTimingAlmostEqual(get.elapsed, 0) 1022 1023 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1024 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1025 1026 proc.join() 1027 close_queue(queue) 1028 1029 @classmethod 1030 def _test_fork(cls, queue): 1031 for i in range(10, 20): 1032 queue.put(i) 1033 # note that at this point the items may only be buffered, so the 1034 # process cannot shutdown until the feeder thread has finished 1035 # pushing items onto the pipe. 1036 1037 def test_fork(self): 1038 # Old versions of Queue would fail to create a new feeder 1039 # thread for a forked process if the original process had its 1040 # own feeder thread. This test checks that this no longer 1041 # happens. 1042 1043 queue = self.Queue() 1044 1045 # put items on queue so that main process starts a feeder thread 1046 for i in range(10): 1047 queue.put(i) 1048 1049 # wait to make sure thread starts before we fork a new process 1050 time.sleep(DELTA) 1051 1052 # fork process 1053 p = self.Process(target=self._test_fork, args=(queue,)) 1054 p.daemon = True 1055 p.start() 1056 1057 # check that all expected items are in the queue 1058 for i in range(20): 1059 self.assertEqual(queue.get(), i) 1060 self.assertRaises(pyqueue.Empty, queue.get, False) 1061 1062 p.join() 1063 close_queue(queue) 1064 1065 def test_qsize(self): 1066 q = self.Queue() 1067 try: 1068 self.assertEqual(q.qsize(), 0) 1069 except NotImplementedError: 1070 self.skipTest('qsize method not implemented') 1071 q.put(1) 1072 self.assertEqual(q.qsize(), 1) 1073 q.put(5) 1074 self.assertEqual(q.qsize(), 2) 1075 q.get() 1076 self.assertEqual(q.qsize(), 1) 1077 q.get() 1078 self.assertEqual(q.qsize(), 0) 1079 close_queue(q) 1080 1081 @classmethod 1082 def _test_task_done(cls, q): 1083 for obj in iter(q.get, None): 1084 time.sleep(DELTA) 1085 q.task_done() 1086 1087 def test_task_done(self): 1088 queue = self.JoinableQueue() 1089 1090 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1091 for i in range(4)] 1092 1093 for p in workers: 1094 p.daemon = True 1095 p.start() 1096 1097 for i in range(10): 1098 queue.put(i) 1099 1100 queue.join() 1101 1102 for p in workers: 1103 queue.put(None) 1104 1105 for p in workers: 1106 p.join() 1107 close_queue(queue) 1108 1109 def test_no_import_lock_contention(self): 1110 with test.support.temp_cwd(): 1111 module_name = 'imported_by_an_imported_module' 1112 with open(module_name + '.py', 'w') as f: 1113 f.write("""if 1: 1114 import multiprocessing 1115 1116 q = multiprocessing.Queue() 1117 q.put('knock knock') 1118 q.get(timeout=3) 1119 q.close() 1120 del q 1121 """) 1122 1123 with test.support.DirsOnSysPath(os.getcwd()): 1124 try: 1125 __import__(module_name) 1126 except pyqueue.Empty: 1127 self.fail("Probable regression on import lock contention;" 1128 " see Issue #22853") 1129 1130 def test_timeout(self): 1131 q = multiprocessing.Queue() 1132 start = time.monotonic() 1133 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1134 delta = time.monotonic() - start 1135 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1136 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1137 # failed because the delta was only 135.8 ms. 1138 self.assertGreaterEqual(delta, 0.100) 1139 close_queue(q) 1140 1141 def test_queue_feeder_donot_stop_onexc(self): 1142 # bpo-30414: verify feeder handles exceptions correctly 1143 if self.TYPE != 'processes': 1144 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1145 1146 class NotSerializable(object): 1147 def __reduce__(self): 1148 raise AttributeError 1149 with test.support.captured_stderr(): 1150 q = self.Queue() 1151 q.put(NotSerializable()) 1152 q.put(True) 1153 self.assertTrue(q.get(timeout=TIMEOUT)) 1154 close_queue(q) 1155 1156 with test.support.captured_stderr(): 1157 # bpo-33078: verify that the queue size is correctly handled 1158 # on errors. 1159 q = self.Queue(maxsize=1) 1160 q.put(NotSerializable()) 1161 q.put(True) 1162 try: 1163 self.assertEqual(q.qsize(), 1) 1164 except NotImplementedError: 1165 # qsize is not available on all platform as it 1166 # relies on sem_getvalue 1167 pass 1168 # bpo-30595: use a timeout of 1 second for slow buildbots 1169 self.assertTrue(q.get(timeout=1.0)) 1170 # Check that the size of the queue is correct 1171 self.assertTrue(q.empty()) 1172 close_queue(q) 1173 1174 def test_queue_feeder_on_queue_feeder_error(self): 1175 # bpo-30006: verify feeder handles exceptions using the 1176 # _on_queue_feeder_error hook. 1177 if self.TYPE != 'processes': 1178 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1179 1180 class NotSerializable(object): 1181 """Mock unserializable object""" 1182 def __init__(self): 1183 self.reduce_was_called = False 1184 self.on_queue_feeder_error_was_called = False 1185 1186 def __reduce__(self): 1187 self.reduce_was_called = True 1188 raise AttributeError 1189 1190 class SafeQueue(multiprocessing.queues.Queue): 1191 """Queue with overloaded _on_queue_feeder_error hook""" 1192 @staticmethod 1193 def _on_queue_feeder_error(e, obj): 1194 if (isinstance(e, AttributeError) and 1195 isinstance(obj, NotSerializable)): 1196 obj.on_queue_feeder_error_was_called = True 1197 1198 not_serializable_obj = NotSerializable() 1199 # The captured_stderr reduces the noise in the test report 1200 with test.support.captured_stderr(): 1201 q = SafeQueue(ctx=multiprocessing.get_context()) 1202 q.put(not_serializable_obj) 1203 1204 # Verify that q is still functioning correctly 1205 q.put(True) 1206 self.assertTrue(q.get(timeout=1.0)) 1207 1208 # Assert that the serialization and the hook have been called correctly 1209 self.assertTrue(not_serializable_obj.reduce_was_called) 1210 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1211 1212 def test_closed_queue_put_get_exceptions(self): 1213 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1214 q.close() 1215 with self.assertRaisesRegex(ValueError, 'is closed'): 1216 q.put('foo') 1217 with self.assertRaisesRegex(ValueError, 'is closed'): 1218 q.get() 1219# 1220# 1221# 1222 1223class _TestLock(BaseTestCase): 1224 1225 def test_lock(self): 1226 lock = self.Lock() 1227 self.assertEqual(lock.acquire(), True) 1228 self.assertEqual(lock.acquire(False), False) 1229 self.assertEqual(lock.release(), None) 1230 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1231 1232 def test_rlock(self): 1233 lock = self.RLock() 1234 self.assertEqual(lock.acquire(), True) 1235 self.assertEqual(lock.acquire(), True) 1236 self.assertEqual(lock.acquire(), True) 1237 self.assertEqual(lock.release(), None) 1238 self.assertEqual(lock.release(), None) 1239 self.assertEqual(lock.release(), None) 1240 self.assertRaises((AssertionError, RuntimeError), lock.release) 1241 1242 def test_lock_context(self): 1243 with self.Lock(): 1244 pass 1245 1246 1247class _TestSemaphore(BaseTestCase): 1248 1249 def _test_semaphore(self, sem): 1250 self.assertReturnsIfImplemented(2, get_value, sem) 1251 self.assertEqual(sem.acquire(), True) 1252 self.assertReturnsIfImplemented(1, get_value, sem) 1253 self.assertEqual(sem.acquire(), True) 1254 self.assertReturnsIfImplemented(0, get_value, sem) 1255 self.assertEqual(sem.acquire(False), False) 1256 self.assertReturnsIfImplemented(0, get_value, sem) 1257 self.assertEqual(sem.release(), None) 1258 self.assertReturnsIfImplemented(1, get_value, sem) 1259 self.assertEqual(sem.release(), None) 1260 self.assertReturnsIfImplemented(2, get_value, sem) 1261 1262 def test_semaphore(self): 1263 sem = self.Semaphore(2) 1264 self._test_semaphore(sem) 1265 self.assertEqual(sem.release(), None) 1266 self.assertReturnsIfImplemented(3, get_value, sem) 1267 self.assertEqual(sem.release(), None) 1268 self.assertReturnsIfImplemented(4, get_value, sem) 1269 1270 def test_bounded_semaphore(self): 1271 sem = self.BoundedSemaphore(2) 1272 self._test_semaphore(sem) 1273 # Currently fails on OS/X 1274 #if HAVE_GETVALUE: 1275 # self.assertRaises(ValueError, sem.release) 1276 # self.assertReturnsIfImplemented(2, get_value, sem) 1277 1278 def test_timeout(self): 1279 if self.TYPE != 'processes': 1280 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1281 1282 sem = self.Semaphore(0) 1283 acquire = TimingWrapper(sem.acquire) 1284 1285 self.assertEqual(acquire(False), False) 1286 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1287 1288 self.assertEqual(acquire(False, None), False) 1289 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1290 1291 self.assertEqual(acquire(False, TIMEOUT1), False) 1292 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1293 1294 self.assertEqual(acquire(True, TIMEOUT2), False) 1295 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1296 1297 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1298 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1299 1300 1301class _TestCondition(BaseTestCase): 1302 1303 @classmethod 1304 def f(cls, cond, sleeping, woken, timeout=None): 1305 cond.acquire() 1306 sleeping.release() 1307 cond.wait(timeout) 1308 woken.release() 1309 cond.release() 1310 1311 def assertReachesEventually(self, func, value): 1312 for i in range(10): 1313 try: 1314 if func() == value: 1315 break 1316 except NotImplementedError: 1317 break 1318 time.sleep(DELTA) 1319 time.sleep(DELTA) 1320 self.assertReturnsIfImplemented(value, func) 1321 1322 def check_invariant(self, cond): 1323 # this is only supposed to succeed when there are no sleepers 1324 if self.TYPE == 'processes': 1325 try: 1326 sleepers = (cond._sleeping_count.get_value() - 1327 cond._woken_count.get_value()) 1328 self.assertEqual(sleepers, 0) 1329 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1330 except NotImplementedError: 1331 pass 1332 1333 def test_notify(self): 1334 cond = self.Condition() 1335 sleeping = self.Semaphore(0) 1336 woken = self.Semaphore(0) 1337 1338 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1339 p.daemon = True 1340 p.start() 1341 self.addCleanup(p.join) 1342 1343 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1344 p.daemon = True 1345 p.start() 1346 self.addCleanup(p.join) 1347 1348 # wait for both children to start sleeping 1349 sleeping.acquire() 1350 sleeping.acquire() 1351 1352 # check no process/thread has woken up 1353 time.sleep(DELTA) 1354 self.assertReturnsIfImplemented(0, get_value, woken) 1355 1356 # wake up one process/thread 1357 cond.acquire() 1358 cond.notify() 1359 cond.release() 1360 1361 # check one process/thread has woken up 1362 time.sleep(DELTA) 1363 self.assertReturnsIfImplemented(1, get_value, woken) 1364 1365 # wake up another 1366 cond.acquire() 1367 cond.notify() 1368 cond.release() 1369 1370 # check other has woken up 1371 time.sleep(DELTA) 1372 self.assertReturnsIfImplemented(2, get_value, woken) 1373 1374 # check state is not mucked up 1375 self.check_invariant(cond) 1376 p.join() 1377 1378 def test_notify_all(self): 1379 cond = self.Condition() 1380 sleeping = self.Semaphore(0) 1381 woken = self.Semaphore(0) 1382 1383 # start some threads/processes which will timeout 1384 for i in range(3): 1385 p = self.Process(target=self.f, 1386 args=(cond, sleeping, woken, TIMEOUT1)) 1387 p.daemon = True 1388 p.start() 1389 self.addCleanup(p.join) 1390 1391 t = threading.Thread(target=self.f, 1392 args=(cond, sleeping, woken, TIMEOUT1)) 1393 t.daemon = True 1394 t.start() 1395 self.addCleanup(t.join) 1396 1397 # wait for them all to sleep 1398 for i in range(6): 1399 sleeping.acquire() 1400 1401 # check they have all timed out 1402 for i in range(6): 1403 woken.acquire() 1404 self.assertReturnsIfImplemented(0, get_value, woken) 1405 1406 # check state is not mucked up 1407 self.check_invariant(cond) 1408 1409 # start some more threads/processes 1410 for i in range(3): 1411 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1412 p.daemon = True 1413 p.start() 1414 self.addCleanup(p.join) 1415 1416 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1417 t.daemon = True 1418 t.start() 1419 self.addCleanup(t.join) 1420 1421 # wait for them to all sleep 1422 for i in range(6): 1423 sleeping.acquire() 1424 1425 # check no process/thread has woken up 1426 time.sleep(DELTA) 1427 self.assertReturnsIfImplemented(0, get_value, woken) 1428 1429 # wake them all up 1430 cond.acquire() 1431 cond.notify_all() 1432 cond.release() 1433 1434 # check they have all woken 1435 self.assertReachesEventually(lambda: get_value(woken), 6) 1436 1437 # check state is not mucked up 1438 self.check_invariant(cond) 1439 1440 def test_notify_n(self): 1441 cond = self.Condition() 1442 sleeping = self.Semaphore(0) 1443 woken = self.Semaphore(0) 1444 1445 # start some threads/processes 1446 for i in range(3): 1447 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1448 p.daemon = True 1449 p.start() 1450 self.addCleanup(p.join) 1451 1452 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1453 t.daemon = True 1454 t.start() 1455 self.addCleanup(t.join) 1456 1457 # wait for them to all sleep 1458 for i in range(6): 1459 sleeping.acquire() 1460 1461 # check no process/thread has woken up 1462 time.sleep(DELTA) 1463 self.assertReturnsIfImplemented(0, get_value, woken) 1464 1465 # wake some of them up 1466 cond.acquire() 1467 cond.notify(n=2) 1468 cond.release() 1469 1470 # check 2 have woken 1471 self.assertReachesEventually(lambda: get_value(woken), 2) 1472 1473 # wake the rest of them 1474 cond.acquire() 1475 cond.notify(n=4) 1476 cond.release() 1477 1478 self.assertReachesEventually(lambda: get_value(woken), 6) 1479 1480 # doesn't do anything more 1481 cond.acquire() 1482 cond.notify(n=3) 1483 cond.release() 1484 1485 self.assertReturnsIfImplemented(6, get_value, woken) 1486 1487 # check state is not mucked up 1488 self.check_invariant(cond) 1489 1490 def test_timeout(self): 1491 cond = self.Condition() 1492 wait = TimingWrapper(cond.wait) 1493 cond.acquire() 1494 res = wait(TIMEOUT1) 1495 cond.release() 1496 self.assertEqual(res, False) 1497 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1498 1499 @classmethod 1500 def _test_waitfor_f(cls, cond, state): 1501 with cond: 1502 state.value = 0 1503 cond.notify() 1504 result = cond.wait_for(lambda : state.value==4) 1505 if not result or state.value != 4: 1506 sys.exit(1) 1507 1508 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1509 def test_waitfor(self): 1510 # based on test in test/lock_tests.py 1511 cond = self.Condition() 1512 state = self.Value('i', -1) 1513 1514 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1515 p.daemon = True 1516 p.start() 1517 1518 with cond: 1519 result = cond.wait_for(lambda : state.value==0) 1520 self.assertTrue(result) 1521 self.assertEqual(state.value, 0) 1522 1523 for i in range(4): 1524 time.sleep(0.01) 1525 with cond: 1526 state.value += 1 1527 cond.notify() 1528 1529 join_process(p) 1530 self.assertEqual(p.exitcode, 0) 1531 1532 @classmethod 1533 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1534 sem.release() 1535 with cond: 1536 expected = 0.1 1537 dt = time.monotonic() 1538 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1539 dt = time.monotonic() - dt 1540 # borrow logic in assertTimeout() from test/lock_tests.py 1541 if not result and expected * 0.6 < dt < expected * 10.0: 1542 success.value = True 1543 1544 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1545 def test_waitfor_timeout(self): 1546 # based on test in test/lock_tests.py 1547 cond = self.Condition() 1548 state = self.Value('i', 0) 1549 success = self.Value('i', False) 1550 sem = self.Semaphore(0) 1551 1552 p = self.Process(target=self._test_waitfor_timeout_f, 1553 args=(cond, state, success, sem)) 1554 p.daemon = True 1555 p.start() 1556 self.assertTrue(sem.acquire(timeout=TIMEOUT)) 1557 1558 # Only increment 3 times, so state == 4 is never reached. 1559 for i in range(3): 1560 time.sleep(0.01) 1561 with cond: 1562 state.value += 1 1563 cond.notify() 1564 1565 join_process(p) 1566 self.assertTrue(success.value) 1567 1568 @classmethod 1569 def _test_wait_result(cls, c, pid): 1570 with c: 1571 c.notify() 1572 time.sleep(1) 1573 if pid is not None: 1574 os.kill(pid, signal.SIGINT) 1575 1576 def test_wait_result(self): 1577 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1578 pid = os.getpid() 1579 else: 1580 pid = None 1581 1582 c = self.Condition() 1583 with c: 1584 self.assertFalse(c.wait(0)) 1585 self.assertFalse(c.wait(0.1)) 1586 1587 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1588 p.start() 1589 1590 self.assertTrue(c.wait(60)) 1591 if pid is not None: 1592 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1593 1594 p.join() 1595 1596 1597class _TestEvent(BaseTestCase): 1598 1599 @classmethod 1600 def _test_event(cls, event): 1601 time.sleep(TIMEOUT2) 1602 event.set() 1603 1604 def test_event(self): 1605 event = self.Event() 1606 wait = TimingWrapper(event.wait) 1607 1608 # Removed temporarily, due to API shear, this does not 1609 # work with threading._Event objects. is_set == isSet 1610 self.assertEqual(event.is_set(), False) 1611 1612 # Removed, threading.Event.wait() will return the value of the __flag 1613 # instead of None. API Shear with the semaphore backed mp.Event 1614 self.assertEqual(wait(0.0), False) 1615 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1616 self.assertEqual(wait(TIMEOUT1), False) 1617 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1618 1619 event.set() 1620 1621 # See note above on the API differences 1622 self.assertEqual(event.is_set(), True) 1623 self.assertEqual(wait(), True) 1624 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1625 self.assertEqual(wait(TIMEOUT1), True) 1626 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1627 # self.assertEqual(event.is_set(), True) 1628 1629 event.clear() 1630 1631 #self.assertEqual(event.is_set(), False) 1632 1633 p = self.Process(target=self._test_event, args=(event,)) 1634 p.daemon = True 1635 p.start() 1636 self.assertEqual(wait(), True) 1637 p.join() 1638 1639# 1640# Tests for Barrier - adapted from tests in test/lock_tests.py 1641# 1642 1643# Many of the tests for threading.Barrier use a list as an atomic 1644# counter: a value is appended to increment the counter, and the 1645# length of the list gives the value. We use the class DummyList 1646# for the same purpose. 1647 1648class _DummyList(object): 1649 1650 def __init__(self): 1651 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1652 lock = multiprocessing.Lock() 1653 self.__setstate__((wrapper, lock)) 1654 self._lengthbuf[0] = 0 1655 1656 def __setstate__(self, state): 1657 (self._wrapper, self._lock) = state 1658 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1659 1660 def __getstate__(self): 1661 return (self._wrapper, self._lock) 1662 1663 def append(self, _): 1664 with self._lock: 1665 self._lengthbuf[0] += 1 1666 1667 def __len__(self): 1668 with self._lock: 1669 return self._lengthbuf[0] 1670 1671def _wait(): 1672 # A crude wait/yield function not relying on synchronization primitives. 1673 time.sleep(0.01) 1674 1675 1676class Bunch(object): 1677 """ 1678 A bunch of threads. 1679 """ 1680 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1681 """ 1682 Construct a bunch of `n` threads running the same function `f`. 1683 If `wait_before_exit` is True, the threads won't terminate until 1684 do_finish() is called. 1685 """ 1686 self.f = f 1687 self.args = args 1688 self.n = n 1689 self.started = namespace.DummyList() 1690 self.finished = namespace.DummyList() 1691 self._can_exit = namespace.Event() 1692 if not wait_before_exit: 1693 self._can_exit.set() 1694 1695 threads = [] 1696 for i in range(n): 1697 p = namespace.Process(target=self.task) 1698 p.daemon = True 1699 p.start() 1700 threads.append(p) 1701 1702 def finalize(threads): 1703 for p in threads: 1704 p.join() 1705 1706 self._finalizer = weakref.finalize(self, finalize, threads) 1707 1708 def task(self): 1709 pid = os.getpid() 1710 self.started.append(pid) 1711 try: 1712 self.f(*self.args) 1713 finally: 1714 self.finished.append(pid) 1715 self._can_exit.wait(30) 1716 assert self._can_exit.is_set() 1717 1718 def wait_for_started(self): 1719 while len(self.started) < self.n: 1720 _wait() 1721 1722 def wait_for_finished(self): 1723 while len(self.finished) < self.n: 1724 _wait() 1725 1726 def do_finish(self): 1727 self._can_exit.set() 1728 1729 def close(self): 1730 self._finalizer() 1731 1732 1733class AppendTrue(object): 1734 def __init__(self, obj): 1735 self.obj = obj 1736 def __call__(self): 1737 self.obj.append(True) 1738 1739 1740class _TestBarrier(BaseTestCase): 1741 """ 1742 Tests for Barrier objects. 1743 """ 1744 N = 5 1745 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1746 1747 def setUp(self): 1748 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1749 1750 def tearDown(self): 1751 self.barrier.abort() 1752 self.barrier = None 1753 1754 def DummyList(self): 1755 if self.TYPE == 'threads': 1756 return [] 1757 elif self.TYPE == 'manager': 1758 return self.manager.list() 1759 else: 1760 return _DummyList() 1761 1762 def run_threads(self, f, args): 1763 b = Bunch(self, f, args, self.N-1) 1764 try: 1765 f(*args) 1766 b.wait_for_finished() 1767 finally: 1768 b.close() 1769 1770 @classmethod 1771 def multipass(cls, barrier, results, n): 1772 m = barrier.parties 1773 assert m == cls.N 1774 for i in range(n): 1775 results[0].append(True) 1776 assert len(results[1]) == i * m 1777 barrier.wait() 1778 results[1].append(True) 1779 assert len(results[0]) == (i + 1) * m 1780 barrier.wait() 1781 try: 1782 assert barrier.n_waiting == 0 1783 except NotImplementedError: 1784 pass 1785 assert not barrier.broken 1786 1787 def test_barrier(self, passes=1): 1788 """ 1789 Test that a barrier is passed in lockstep 1790 """ 1791 results = [self.DummyList(), self.DummyList()] 1792 self.run_threads(self.multipass, (self.barrier, results, passes)) 1793 1794 def test_barrier_10(self): 1795 """ 1796 Test that a barrier works for 10 consecutive runs 1797 """ 1798 return self.test_barrier(10) 1799 1800 @classmethod 1801 def _test_wait_return_f(cls, barrier, queue): 1802 res = barrier.wait() 1803 queue.put(res) 1804 1805 def test_wait_return(self): 1806 """ 1807 test the return value from barrier.wait 1808 """ 1809 queue = self.Queue() 1810 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1811 results = [queue.get() for i in range(self.N)] 1812 self.assertEqual(results.count(0), 1) 1813 close_queue(queue) 1814 1815 @classmethod 1816 def _test_action_f(cls, barrier, results): 1817 barrier.wait() 1818 if len(results) != 1: 1819 raise RuntimeError 1820 1821 def test_action(self): 1822 """ 1823 Test the 'action' callback 1824 """ 1825 results = self.DummyList() 1826 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1827 self.run_threads(self._test_action_f, (barrier, results)) 1828 self.assertEqual(len(results), 1) 1829 1830 @classmethod 1831 def _test_abort_f(cls, barrier, results1, results2): 1832 try: 1833 i = barrier.wait() 1834 if i == cls.N//2: 1835 raise RuntimeError 1836 barrier.wait() 1837 results1.append(True) 1838 except threading.BrokenBarrierError: 1839 results2.append(True) 1840 except RuntimeError: 1841 barrier.abort() 1842 1843 def test_abort(self): 1844 """ 1845 Test that an abort will put the barrier in a broken state 1846 """ 1847 results1 = self.DummyList() 1848 results2 = self.DummyList() 1849 self.run_threads(self._test_abort_f, 1850 (self.barrier, results1, results2)) 1851 self.assertEqual(len(results1), 0) 1852 self.assertEqual(len(results2), self.N-1) 1853 self.assertTrue(self.barrier.broken) 1854 1855 @classmethod 1856 def _test_reset_f(cls, barrier, results1, results2, results3): 1857 i = barrier.wait() 1858 if i == cls.N//2: 1859 # Wait until the other threads are all in the barrier. 1860 while barrier.n_waiting < cls.N-1: 1861 time.sleep(0.001) 1862 barrier.reset() 1863 else: 1864 try: 1865 barrier.wait() 1866 results1.append(True) 1867 except threading.BrokenBarrierError: 1868 results2.append(True) 1869 # Now, pass the barrier again 1870 barrier.wait() 1871 results3.append(True) 1872 1873 def test_reset(self): 1874 """ 1875 Test that a 'reset' on a barrier frees the waiting threads 1876 """ 1877 results1 = self.DummyList() 1878 results2 = self.DummyList() 1879 results3 = self.DummyList() 1880 self.run_threads(self._test_reset_f, 1881 (self.barrier, results1, results2, results3)) 1882 self.assertEqual(len(results1), 0) 1883 self.assertEqual(len(results2), self.N-1) 1884 self.assertEqual(len(results3), self.N) 1885 1886 @classmethod 1887 def _test_abort_and_reset_f(cls, barrier, barrier2, 1888 results1, results2, results3): 1889 try: 1890 i = barrier.wait() 1891 if i == cls.N//2: 1892 raise RuntimeError 1893 barrier.wait() 1894 results1.append(True) 1895 except threading.BrokenBarrierError: 1896 results2.append(True) 1897 except RuntimeError: 1898 barrier.abort() 1899 # Synchronize and reset the barrier. Must synchronize first so 1900 # that everyone has left it when we reset, and after so that no 1901 # one enters it before the reset. 1902 if barrier2.wait() == cls.N//2: 1903 barrier.reset() 1904 barrier2.wait() 1905 barrier.wait() 1906 results3.append(True) 1907 1908 def test_abort_and_reset(self): 1909 """ 1910 Test that a barrier can be reset after being broken. 1911 """ 1912 results1 = self.DummyList() 1913 results2 = self.DummyList() 1914 results3 = self.DummyList() 1915 barrier2 = self.Barrier(self.N) 1916 1917 self.run_threads(self._test_abort_and_reset_f, 1918 (self.barrier, barrier2, results1, results2, results3)) 1919 self.assertEqual(len(results1), 0) 1920 self.assertEqual(len(results2), self.N-1) 1921 self.assertEqual(len(results3), self.N) 1922 1923 @classmethod 1924 def _test_timeout_f(cls, barrier, results): 1925 i = barrier.wait() 1926 if i == cls.N//2: 1927 # One thread is late! 1928 time.sleep(1.0) 1929 try: 1930 barrier.wait(0.5) 1931 except threading.BrokenBarrierError: 1932 results.append(True) 1933 1934 def test_timeout(self): 1935 """ 1936 Test wait(timeout) 1937 """ 1938 results = self.DummyList() 1939 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1940 self.assertEqual(len(results), self.barrier.parties) 1941 1942 @classmethod 1943 def _test_default_timeout_f(cls, barrier, results): 1944 i = barrier.wait(cls.defaultTimeout) 1945 if i == cls.N//2: 1946 # One thread is later than the default timeout 1947 time.sleep(1.0) 1948 try: 1949 barrier.wait() 1950 except threading.BrokenBarrierError: 1951 results.append(True) 1952 1953 def test_default_timeout(self): 1954 """ 1955 Test the barrier's default timeout 1956 """ 1957 barrier = self.Barrier(self.N, timeout=0.5) 1958 results = self.DummyList() 1959 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1960 self.assertEqual(len(results), barrier.parties) 1961 1962 def test_single_thread(self): 1963 b = self.Barrier(1) 1964 b.wait() 1965 b.wait() 1966 1967 @classmethod 1968 def _test_thousand_f(cls, barrier, passes, conn, lock): 1969 for i in range(passes): 1970 barrier.wait() 1971 with lock: 1972 conn.send(i) 1973 1974 def test_thousand(self): 1975 if self.TYPE == 'manager': 1976 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1977 passes = 1000 1978 lock = self.Lock() 1979 conn, child_conn = self.Pipe(False) 1980 for j in range(self.N): 1981 p = self.Process(target=self._test_thousand_f, 1982 args=(self.barrier, passes, child_conn, lock)) 1983 p.start() 1984 self.addCleanup(p.join) 1985 1986 for i in range(passes): 1987 for j in range(self.N): 1988 self.assertEqual(conn.recv(), i) 1989 1990# 1991# 1992# 1993 1994class _TestValue(BaseTestCase): 1995 1996 ALLOWED_TYPES = ('processes',) 1997 1998 codes_values = [ 1999 ('i', 4343, 24234), 2000 ('d', 3.625, -4.25), 2001 ('h', -232, 234), 2002 ('q', 2 ** 33, 2 ** 34), 2003 ('c', latin('x'), latin('y')) 2004 ] 2005 2006 def setUp(self): 2007 if not HAS_SHAREDCTYPES: 2008 self.skipTest("requires multiprocessing.sharedctypes") 2009 2010 @classmethod 2011 def _test(cls, values): 2012 for sv, cv in zip(values, cls.codes_values): 2013 sv.value = cv[2] 2014 2015 2016 def test_value(self, raw=False): 2017 if raw: 2018 values = [self.RawValue(code, value) 2019 for code, value, _ in self.codes_values] 2020 else: 2021 values = [self.Value(code, value) 2022 for code, value, _ in self.codes_values] 2023 2024 for sv, cv in zip(values, self.codes_values): 2025 self.assertEqual(sv.value, cv[1]) 2026 2027 proc = self.Process(target=self._test, args=(values,)) 2028 proc.daemon = True 2029 proc.start() 2030 proc.join() 2031 2032 for sv, cv in zip(values, self.codes_values): 2033 self.assertEqual(sv.value, cv[2]) 2034 2035 def test_rawvalue(self): 2036 self.test_value(raw=True) 2037 2038 def test_getobj_getlock(self): 2039 val1 = self.Value('i', 5) 2040 lock1 = val1.get_lock() 2041 obj1 = val1.get_obj() 2042 2043 val2 = self.Value('i', 5, lock=None) 2044 lock2 = val2.get_lock() 2045 obj2 = val2.get_obj() 2046 2047 lock = self.Lock() 2048 val3 = self.Value('i', 5, lock=lock) 2049 lock3 = val3.get_lock() 2050 obj3 = val3.get_obj() 2051 self.assertEqual(lock, lock3) 2052 2053 arr4 = self.Value('i', 5, lock=False) 2054 self.assertFalse(hasattr(arr4, 'get_lock')) 2055 self.assertFalse(hasattr(arr4, 'get_obj')) 2056 2057 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2058 2059 arr5 = self.RawValue('i', 5) 2060 self.assertFalse(hasattr(arr5, 'get_lock')) 2061 self.assertFalse(hasattr(arr5, 'get_obj')) 2062 2063 2064class _TestArray(BaseTestCase): 2065 2066 ALLOWED_TYPES = ('processes',) 2067 2068 @classmethod 2069 def f(cls, seq): 2070 for i in range(1, len(seq)): 2071 seq[i] += seq[i-1] 2072 2073 @unittest.skipIf(c_int is None, "requires _ctypes") 2074 def test_array(self, raw=False): 2075 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2076 if raw: 2077 arr = self.RawArray('i', seq) 2078 else: 2079 arr = self.Array('i', seq) 2080 2081 self.assertEqual(len(arr), len(seq)) 2082 self.assertEqual(arr[3], seq[3]) 2083 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2084 2085 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2086 2087 self.assertEqual(list(arr[:]), seq) 2088 2089 self.f(seq) 2090 2091 p = self.Process(target=self.f, args=(arr,)) 2092 p.daemon = True 2093 p.start() 2094 p.join() 2095 2096 self.assertEqual(list(arr[:]), seq) 2097 2098 @unittest.skipIf(c_int is None, "requires _ctypes") 2099 def test_array_from_size(self): 2100 size = 10 2101 # Test for zeroing (see issue #11675). 2102 # The repetition below strengthens the test by increasing the chances 2103 # of previously allocated non-zero memory being used for the new array 2104 # on the 2nd and 3rd loops. 2105 for _ in range(3): 2106 arr = self.Array('i', size) 2107 self.assertEqual(len(arr), size) 2108 self.assertEqual(list(arr), [0] * size) 2109 arr[:] = range(10) 2110 self.assertEqual(list(arr), list(range(10))) 2111 del arr 2112 2113 @unittest.skipIf(c_int is None, "requires _ctypes") 2114 def test_rawarray(self): 2115 self.test_array(raw=True) 2116 2117 @unittest.skipIf(c_int is None, "requires _ctypes") 2118 def test_getobj_getlock_obj(self): 2119 arr1 = self.Array('i', list(range(10))) 2120 lock1 = arr1.get_lock() 2121 obj1 = arr1.get_obj() 2122 2123 arr2 = self.Array('i', list(range(10)), lock=None) 2124 lock2 = arr2.get_lock() 2125 obj2 = arr2.get_obj() 2126 2127 lock = self.Lock() 2128 arr3 = self.Array('i', list(range(10)), lock=lock) 2129 lock3 = arr3.get_lock() 2130 obj3 = arr3.get_obj() 2131 self.assertEqual(lock, lock3) 2132 2133 arr4 = self.Array('i', range(10), lock=False) 2134 self.assertFalse(hasattr(arr4, 'get_lock')) 2135 self.assertFalse(hasattr(arr4, 'get_obj')) 2136 self.assertRaises(AttributeError, 2137 self.Array, 'i', range(10), lock='notalock') 2138 2139 arr5 = self.RawArray('i', range(10)) 2140 self.assertFalse(hasattr(arr5, 'get_lock')) 2141 self.assertFalse(hasattr(arr5, 'get_obj')) 2142 2143# 2144# 2145# 2146 2147class _TestContainers(BaseTestCase): 2148 2149 ALLOWED_TYPES = ('manager',) 2150 2151 def test_list(self): 2152 a = self.list(list(range(10))) 2153 self.assertEqual(a[:], list(range(10))) 2154 2155 b = self.list() 2156 self.assertEqual(b[:], []) 2157 2158 b.extend(list(range(5))) 2159 self.assertEqual(b[:], list(range(5))) 2160 2161 self.assertEqual(b[2], 2) 2162 self.assertEqual(b[2:10], [2,3,4]) 2163 2164 b *= 2 2165 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2166 2167 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2168 2169 self.assertEqual(a[:], list(range(10))) 2170 2171 d = [a, b] 2172 e = self.list(d) 2173 self.assertEqual( 2174 [element[:] for element in e], 2175 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2176 ) 2177 2178 f = self.list([a]) 2179 a.append('hello') 2180 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2181 2182 def test_list_iter(self): 2183 a = self.list(list(range(10))) 2184 it = iter(a) 2185 self.assertEqual(list(it), list(range(10))) 2186 self.assertEqual(list(it), []) # exhausted 2187 # list modified during iteration 2188 it = iter(a) 2189 a[0] = 100 2190 self.assertEqual(next(it), 100) 2191 2192 def test_list_proxy_in_list(self): 2193 a = self.list([self.list(range(3)) for _i in range(3)]) 2194 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2195 2196 a[0][-1] = 55 2197 self.assertEqual(a[0][:], [0, 1, 55]) 2198 for i in range(1, 3): 2199 self.assertEqual(a[i][:], [0, 1, 2]) 2200 2201 self.assertEqual(a[1].pop(), 2) 2202 self.assertEqual(len(a[1]), 2) 2203 for i in range(0, 3, 2): 2204 self.assertEqual(len(a[i]), 3) 2205 2206 del a 2207 2208 b = self.list() 2209 b.append(b) 2210 del b 2211 2212 def test_dict(self): 2213 d = self.dict() 2214 indices = list(range(65, 70)) 2215 for i in indices: 2216 d[i] = chr(i) 2217 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2218 self.assertEqual(sorted(d.keys()), indices) 2219 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2220 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2221 2222 def test_dict_iter(self): 2223 d = self.dict() 2224 indices = list(range(65, 70)) 2225 for i in indices: 2226 d[i] = chr(i) 2227 it = iter(d) 2228 self.assertEqual(list(it), indices) 2229 self.assertEqual(list(it), []) # exhausted 2230 # dictionary changed size during iteration 2231 it = iter(d) 2232 d.clear() 2233 self.assertRaises(RuntimeError, next, it) 2234 2235 def test_dict_proxy_nested(self): 2236 pets = self.dict(ferrets=2, hamsters=4) 2237 supplies = self.dict(water=10, feed=3) 2238 d = self.dict(pets=pets, supplies=supplies) 2239 2240 self.assertEqual(supplies['water'], 10) 2241 self.assertEqual(d['supplies']['water'], 10) 2242 2243 d['supplies']['blankets'] = 5 2244 self.assertEqual(supplies['blankets'], 5) 2245 self.assertEqual(d['supplies']['blankets'], 5) 2246 2247 d['supplies']['water'] = 7 2248 self.assertEqual(supplies['water'], 7) 2249 self.assertEqual(d['supplies']['water'], 7) 2250 2251 del pets 2252 del supplies 2253 self.assertEqual(d['pets']['ferrets'], 2) 2254 d['supplies']['blankets'] = 11 2255 self.assertEqual(d['supplies']['blankets'], 11) 2256 2257 pets = d['pets'] 2258 supplies = d['supplies'] 2259 supplies['water'] = 7 2260 self.assertEqual(supplies['water'], 7) 2261 self.assertEqual(d['supplies']['water'], 7) 2262 2263 d.clear() 2264 self.assertEqual(len(d), 0) 2265 self.assertEqual(supplies['water'], 7) 2266 self.assertEqual(pets['hamsters'], 4) 2267 2268 l = self.list([pets, supplies]) 2269 l[0]['marmots'] = 1 2270 self.assertEqual(pets['marmots'], 1) 2271 self.assertEqual(l[0]['marmots'], 1) 2272 2273 del pets 2274 del supplies 2275 self.assertEqual(l[0]['marmots'], 1) 2276 2277 outer = self.list([[88, 99], l]) 2278 self.assertIsInstance(outer[0], list) # Not a ListProxy 2279 self.assertEqual(outer[-1][-1]['feed'], 3) 2280 2281 def test_namespace(self): 2282 n = self.Namespace() 2283 n.name = 'Bob' 2284 n.job = 'Builder' 2285 n._hidden = 'hidden' 2286 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2287 del n.job 2288 self.assertEqual(str(n), "Namespace(name='Bob')") 2289 self.assertTrue(hasattr(n, 'name')) 2290 self.assertTrue(not hasattr(n, 'job')) 2291 2292# 2293# 2294# 2295 2296def sqr(x, wait=0.0): 2297 time.sleep(wait) 2298 return x*x 2299 2300def mul(x, y): 2301 return x*y 2302 2303def raise_large_valuerror(wait): 2304 time.sleep(wait) 2305 raise ValueError("x" * 1024**2) 2306 2307def identity(x): 2308 return x 2309 2310class CountedObject(object): 2311 n_instances = 0 2312 2313 def __new__(cls): 2314 cls.n_instances += 1 2315 return object.__new__(cls) 2316 2317 def __del__(self): 2318 type(self).n_instances -= 1 2319 2320class SayWhenError(ValueError): pass 2321 2322def exception_throwing_generator(total, when): 2323 if when == -1: 2324 raise SayWhenError("Somebody said when") 2325 for i in range(total): 2326 if i == when: 2327 raise SayWhenError("Somebody said when") 2328 yield i 2329 2330 2331class _TestPool(BaseTestCase): 2332 2333 @classmethod 2334 def setUpClass(cls): 2335 super().setUpClass() 2336 cls.pool = cls.Pool(4) 2337 2338 @classmethod 2339 def tearDownClass(cls): 2340 cls.pool.terminate() 2341 cls.pool.join() 2342 cls.pool = None 2343 super().tearDownClass() 2344 2345 def test_apply(self): 2346 papply = self.pool.apply 2347 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2348 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2349 2350 def test_map(self): 2351 pmap = self.pool.map 2352 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2353 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2354 list(map(sqr, list(range(100))))) 2355 2356 def test_starmap(self): 2357 psmap = self.pool.starmap 2358 tuples = list(zip(range(10), range(9,-1, -1))) 2359 self.assertEqual(psmap(mul, tuples), 2360 list(itertools.starmap(mul, tuples))) 2361 tuples = list(zip(range(100), range(99,-1, -1))) 2362 self.assertEqual(psmap(mul, tuples, chunksize=20), 2363 list(itertools.starmap(mul, tuples))) 2364 2365 def test_starmap_async(self): 2366 tuples = list(zip(range(100), range(99,-1, -1))) 2367 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2368 list(itertools.starmap(mul, tuples))) 2369 2370 def test_map_async(self): 2371 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2372 list(map(sqr, list(range(10))))) 2373 2374 def test_map_async_callbacks(self): 2375 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2376 self.pool.map_async(int, ['1'], 2377 callback=call_args.append, 2378 error_callback=call_args.append).wait() 2379 self.assertEqual(1, len(call_args)) 2380 self.assertEqual([1], call_args[0]) 2381 self.pool.map_async(int, ['a'], 2382 callback=call_args.append, 2383 error_callback=call_args.append).wait() 2384 self.assertEqual(2, len(call_args)) 2385 self.assertIsInstance(call_args[1], ValueError) 2386 2387 def test_map_unplicklable(self): 2388 # Issue #19425 -- failure to pickle should not cause a hang 2389 if self.TYPE == 'threads': 2390 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2391 class A(object): 2392 def __reduce__(self): 2393 raise RuntimeError('cannot pickle') 2394 with self.assertRaises(RuntimeError): 2395 self.pool.map(sqr, [A()]*10) 2396 2397 def test_map_chunksize(self): 2398 try: 2399 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2400 except multiprocessing.TimeoutError: 2401 self.fail("pool.map_async with chunksize stalled on null list") 2402 2403 def test_map_handle_iterable_exception(self): 2404 if self.TYPE == 'manager': 2405 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2406 2407 # SayWhenError seen at the very first of the iterable 2408 with self.assertRaises(SayWhenError): 2409 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2410 # again, make sure it's reentrant 2411 with self.assertRaises(SayWhenError): 2412 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2413 2414 with self.assertRaises(SayWhenError): 2415 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2416 2417 class SpecialIterable: 2418 def __iter__(self): 2419 return self 2420 def __next__(self): 2421 raise SayWhenError 2422 def __len__(self): 2423 return 1 2424 with self.assertRaises(SayWhenError): 2425 self.pool.map(sqr, SpecialIterable(), 1) 2426 with self.assertRaises(SayWhenError): 2427 self.pool.map(sqr, SpecialIterable(), 1) 2428 2429 def test_async(self): 2430 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2431 get = TimingWrapper(res.get) 2432 self.assertEqual(get(), 49) 2433 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2434 2435 def test_async_timeout(self): 2436 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2437 get = TimingWrapper(res.get) 2438 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2439 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2440 2441 def test_imap(self): 2442 it = self.pool.imap(sqr, list(range(10))) 2443 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2444 2445 it = self.pool.imap(sqr, list(range(10))) 2446 for i in range(10): 2447 self.assertEqual(next(it), i*i) 2448 self.assertRaises(StopIteration, it.__next__) 2449 2450 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2451 for i in range(1000): 2452 self.assertEqual(next(it), i*i) 2453 self.assertRaises(StopIteration, it.__next__) 2454 2455 def test_imap_handle_iterable_exception(self): 2456 if self.TYPE == 'manager': 2457 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2458 2459 # SayWhenError seen at the very first of the iterable 2460 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2461 self.assertRaises(SayWhenError, it.__next__) 2462 # again, make sure it's reentrant 2463 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2464 self.assertRaises(SayWhenError, it.__next__) 2465 2466 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2467 for i in range(3): 2468 self.assertEqual(next(it), i*i) 2469 self.assertRaises(SayWhenError, it.__next__) 2470 2471 # SayWhenError seen at start of problematic chunk's results 2472 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2473 for i in range(6): 2474 self.assertEqual(next(it), i*i) 2475 self.assertRaises(SayWhenError, it.__next__) 2476 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2477 for i in range(4): 2478 self.assertEqual(next(it), i*i) 2479 self.assertRaises(SayWhenError, it.__next__) 2480 2481 def test_imap_unordered(self): 2482 it = self.pool.imap_unordered(sqr, list(range(10))) 2483 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2484 2485 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2486 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2487 2488 def test_imap_unordered_handle_iterable_exception(self): 2489 if self.TYPE == 'manager': 2490 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2491 2492 # SayWhenError seen at the very first of the iterable 2493 it = self.pool.imap_unordered(sqr, 2494 exception_throwing_generator(1, -1), 2495 1) 2496 self.assertRaises(SayWhenError, it.__next__) 2497 # again, make sure it's reentrant 2498 it = self.pool.imap_unordered(sqr, 2499 exception_throwing_generator(1, -1), 2500 1) 2501 self.assertRaises(SayWhenError, it.__next__) 2502 2503 it = self.pool.imap_unordered(sqr, 2504 exception_throwing_generator(10, 3), 2505 1) 2506 expected_values = list(map(sqr, list(range(10)))) 2507 with self.assertRaises(SayWhenError): 2508 # imap_unordered makes it difficult to anticipate the SayWhenError 2509 for i in range(10): 2510 value = next(it) 2511 self.assertIn(value, expected_values) 2512 expected_values.remove(value) 2513 2514 it = self.pool.imap_unordered(sqr, 2515 exception_throwing_generator(20, 7), 2516 2) 2517 expected_values = list(map(sqr, list(range(20)))) 2518 with self.assertRaises(SayWhenError): 2519 for i in range(20): 2520 value = next(it) 2521 self.assertIn(value, expected_values) 2522 expected_values.remove(value) 2523 2524 def test_make_pool(self): 2525 expected_error = (RemoteError if self.TYPE == 'manager' 2526 else ValueError) 2527 2528 self.assertRaises(expected_error, self.Pool, -1) 2529 self.assertRaises(expected_error, self.Pool, 0) 2530 2531 if self.TYPE != 'manager': 2532 p = self.Pool(3) 2533 try: 2534 self.assertEqual(3, len(p._pool)) 2535 finally: 2536 p.close() 2537 p.join() 2538 2539 def test_terminate(self): 2540 result = self.pool.map_async( 2541 time.sleep, [0.1 for i in range(10000)], chunksize=1 2542 ) 2543 self.pool.terminate() 2544 join = TimingWrapper(self.pool.join) 2545 join() 2546 # Sanity check the pool didn't wait for all tasks to finish 2547 self.assertLess(join.elapsed, 2.0) 2548 2549 def test_empty_iterable(self): 2550 # See Issue 12157 2551 p = self.Pool(1) 2552 2553 self.assertEqual(p.map(sqr, []), []) 2554 self.assertEqual(list(p.imap(sqr, [])), []) 2555 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2556 self.assertEqual(p.map_async(sqr, []).get(), []) 2557 2558 p.close() 2559 p.join() 2560 2561 def test_context(self): 2562 if self.TYPE == 'processes': 2563 L = list(range(10)) 2564 expected = [sqr(i) for i in L] 2565 with self.Pool(2) as p: 2566 r = p.map_async(sqr, L) 2567 self.assertEqual(r.get(), expected) 2568 p.join() 2569 self.assertRaises(ValueError, p.map_async, sqr, L) 2570 2571 @classmethod 2572 def _test_traceback(cls): 2573 raise RuntimeError(123) # some comment 2574 2575 def test_traceback(self): 2576 # We want ensure that the traceback from the child process is 2577 # contained in the traceback raised in the main process. 2578 if self.TYPE == 'processes': 2579 with self.Pool(1) as p: 2580 try: 2581 p.apply(self._test_traceback) 2582 except Exception as e: 2583 exc = e 2584 else: 2585 self.fail('expected RuntimeError') 2586 p.join() 2587 self.assertIs(type(exc), RuntimeError) 2588 self.assertEqual(exc.args, (123,)) 2589 cause = exc.__cause__ 2590 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2591 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2592 2593 with test.support.captured_stderr() as f1: 2594 try: 2595 raise exc 2596 except RuntimeError: 2597 sys.excepthook(*sys.exc_info()) 2598 self.assertIn('raise RuntimeError(123) # some comment', 2599 f1.getvalue()) 2600 # _helper_reraises_exception should not make the error 2601 # a remote exception 2602 with self.Pool(1) as p: 2603 try: 2604 p.map(sqr, exception_throwing_generator(1, -1), 1) 2605 except Exception as e: 2606 exc = e 2607 else: 2608 self.fail('expected SayWhenError') 2609 self.assertIs(type(exc), SayWhenError) 2610 self.assertIs(exc.__cause__, None) 2611 p.join() 2612 2613 @classmethod 2614 def _test_wrapped_exception(cls): 2615 raise RuntimeError('foo') 2616 2617 def test_wrapped_exception(self): 2618 # Issue #20980: Should not wrap exception when using thread pool 2619 with self.Pool(1) as p: 2620 with self.assertRaises(RuntimeError): 2621 p.apply(self._test_wrapped_exception) 2622 p.join() 2623 2624 def test_map_no_failfast(self): 2625 # Issue #23992: the fail-fast behaviour when an exception is raised 2626 # during map() would make Pool.join() deadlock, because a worker 2627 # process would fill the result queue (after the result handler thread 2628 # terminated, hence not draining it anymore). 2629 2630 t_start = time.monotonic() 2631 2632 with self.assertRaises(ValueError): 2633 with self.Pool(2) as p: 2634 try: 2635 p.map(raise_large_valuerror, [0, 1]) 2636 finally: 2637 time.sleep(0.5) 2638 p.close() 2639 p.join() 2640 2641 # check that we indeed waited for all jobs 2642 self.assertGreater(time.monotonic() - t_start, 0.9) 2643 2644 def test_release_task_refs(self): 2645 # Issue #29861: task arguments and results should not be kept 2646 # alive after we are done with them. 2647 objs = [CountedObject() for i in range(10)] 2648 refs = [weakref.ref(o) for o in objs] 2649 self.pool.map(identity, objs) 2650 2651 del objs 2652 time.sleep(DELTA) # let threaded cleanup code run 2653 self.assertEqual(set(wr() for wr in refs), {None}) 2654 # With a process pool, copies of the objects are returned, check 2655 # they were released too. 2656 self.assertEqual(CountedObject.n_instances, 0) 2657 2658 def test_enter(self): 2659 if self.TYPE == 'manager': 2660 self.skipTest("test not applicable to manager") 2661 2662 pool = self.Pool(1) 2663 with pool: 2664 pass 2665 # call pool.terminate() 2666 # pool is no longer running 2667 2668 with self.assertRaises(ValueError): 2669 # bpo-35477: pool.__enter__() fails if the pool is not running 2670 with pool: 2671 pass 2672 pool.join() 2673 2674 def test_resource_warning(self): 2675 if self.TYPE == 'manager': 2676 self.skipTest("test not applicable to manager") 2677 2678 pool = self.Pool(1) 2679 pool.terminate() 2680 pool.join() 2681 2682 # force state to RUN to emit ResourceWarning in __del__() 2683 pool._state = multiprocessing.pool.RUN 2684 2685 with support.check_warnings(('unclosed running multiprocessing pool', 2686 ResourceWarning)): 2687 pool = None 2688 support.gc_collect() 2689 2690def raising(): 2691 raise KeyError("key") 2692 2693def unpickleable_result(): 2694 return lambda: 42 2695 2696class _TestPoolWorkerErrors(BaseTestCase): 2697 ALLOWED_TYPES = ('processes', ) 2698 2699 def test_async_error_callback(self): 2700 p = multiprocessing.Pool(2) 2701 2702 scratchpad = [None] 2703 def errback(exc): 2704 scratchpad[0] = exc 2705 2706 res = p.apply_async(raising, error_callback=errback) 2707 self.assertRaises(KeyError, res.get) 2708 self.assertTrue(scratchpad[0]) 2709 self.assertIsInstance(scratchpad[0], KeyError) 2710 2711 p.close() 2712 p.join() 2713 2714 def test_unpickleable_result(self): 2715 from multiprocessing.pool import MaybeEncodingError 2716 p = multiprocessing.Pool(2) 2717 2718 # Make sure we don't lose pool processes because of encoding errors. 2719 for iteration in range(20): 2720 2721 scratchpad = [None] 2722 def errback(exc): 2723 scratchpad[0] = exc 2724 2725 res = p.apply_async(unpickleable_result, error_callback=errback) 2726 self.assertRaises(MaybeEncodingError, res.get) 2727 wrapped = scratchpad[0] 2728 self.assertTrue(wrapped) 2729 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2730 self.assertIsNotNone(wrapped.exc) 2731 self.assertIsNotNone(wrapped.value) 2732 2733 p.close() 2734 p.join() 2735 2736class _TestPoolWorkerLifetime(BaseTestCase): 2737 ALLOWED_TYPES = ('processes', ) 2738 2739 def test_pool_worker_lifetime(self): 2740 p = multiprocessing.Pool(3, maxtasksperchild=10) 2741 self.assertEqual(3, len(p._pool)) 2742 origworkerpids = [w.pid for w in p._pool] 2743 # Run many tasks so each worker gets replaced (hopefully) 2744 results = [] 2745 for i in range(100): 2746 results.append(p.apply_async(sqr, (i, ))) 2747 # Fetch the results and verify we got the right answers, 2748 # also ensuring all the tasks have completed. 2749 for (j, res) in enumerate(results): 2750 self.assertEqual(res.get(), sqr(j)) 2751 # Refill the pool 2752 p._repopulate_pool() 2753 # Wait until all workers are alive 2754 # (countdown * DELTA = 5 seconds max startup process time) 2755 countdown = 50 2756 while countdown and not all(w.is_alive() for w in p._pool): 2757 countdown -= 1 2758 time.sleep(DELTA) 2759 finalworkerpids = [w.pid for w in p._pool] 2760 # All pids should be assigned. See issue #7805. 2761 self.assertNotIn(None, origworkerpids) 2762 self.assertNotIn(None, finalworkerpids) 2763 # Finally, check that the worker pids have changed 2764 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2765 p.close() 2766 p.join() 2767 2768 def test_pool_worker_lifetime_early_close(self): 2769 # Issue #10332: closing a pool whose workers have limited lifetimes 2770 # before all the tasks completed would make join() hang. 2771 p = multiprocessing.Pool(3, maxtasksperchild=1) 2772 results = [] 2773 for i in range(6): 2774 results.append(p.apply_async(sqr, (i, 0.3))) 2775 p.close() 2776 p.join() 2777 # check the results 2778 for (j, res) in enumerate(results): 2779 self.assertEqual(res.get(), sqr(j)) 2780 2781 def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): 2782 # tests cases against bpo-38744 and bpo-39360 2783 cmd = '''if 1: 2784 from multiprocessing import Pool 2785 problem = None 2786 class A: 2787 def __init__(self): 2788 self.pool = Pool(processes=1) 2789 def test(): 2790 global problem 2791 problem = A() 2792 problem.pool.map(float, tuple(range(10))) 2793 if __name__ == "__main__": 2794 test() 2795 ''' 2796 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 2797 self.assertEqual(rc, 0) 2798 2799# 2800# Test of creating a customized manager class 2801# 2802 2803from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2804 2805class FooBar(object): 2806 def f(self): 2807 return 'f()' 2808 def g(self): 2809 raise ValueError 2810 def _h(self): 2811 return '_h()' 2812 2813def baz(): 2814 for i in range(10): 2815 yield i*i 2816 2817class IteratorProxy(BaseProxy): 2818 _exposed_ = ('__next__',) 2819 def __iter__(self): 2820 return self 2821 def __next__(self): 2822 return self._callmethod('__next__') 2823 2824class MyManager(BaseManager): 2825 pass 2826 2827MyManager.register('Foo', callable=FooBar) 2828MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2829MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2830 2831 2832class _TestMyManager(BaseTestCase): 2833 2834 ALLOWED_TYPES = ('manager',) 2835 2836 def test_mymanager(self): 2837 manager = MyManager() 2838 manager.start() 2839 self.common(manager) 2840 manager.shutdown() 2841 2842 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2843 # to the manager process if it takes longer than 1 second to stop, 2844 # which happens on slow buildbots. 2845 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2846 2847 def test_mymanager_context(self): 2848 with MyManager() as manager: 2849 self.common(manager) 2850 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2851 # to the manager process if it takes longer than 1 second to stop, 2852 # which happens on slow buildbots. 2853 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2854 2855 def test_mymanager_context_prestarted(self): 2856 manager = MyManager() 2857 manager.start() 2858 with manager: 2859 self.common(manager) 2860 self.assertEqual(manager._process.exitcode, 0) 2861 2862 def common(self, manager): 2863 foo = manager.Foo() 2864 bar = manager.Bar() 2865 baz = manager.baz() 2866 2867 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2868 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2869 2870 self.assertEqual(foo_methods, ['f', 'g']) 2871 self.assertEqual(bar_methods, ['f', '_h']) 2872 2873 self.assertEqual(foo.f(), 'f()') 2874 self.assertRaises(ValueError, foo.g) 2875 self.assertEqual(foo._callmethod('f'), 'f()') 2876 self.assertRaises(RemoteError, foo._callmethod, '_h') 2877 2878 self.assertEqual(bar.f(), 'f()') 2879 self.assertEqual(bar._h(), '_h()') 2880 self.assertEqual(bar._callmethod('f'), 'f()') 2881 self.assertEqual(bar._callmethod('_h'), '_h()') 2882 2883 self.assertEqual(list(baz), [i*i for i in range(10)]) 2884 2885 2886# 2887# Test of connecting to a remote server and using xmlrpclib for serialization 2888# 2889 2890_queue = pyqueue.Queue() 2891def get_queue(): 2892 return _queue 2893 2894class QueueManager(BaseManager): 2895 '''manager class used by server process''' 2896QueueManager.register('get_queue', callable=get_queue) 2897 2898class QueueManager2(BaseManager): 2899 '''manager class which specifies the same interface as QueueManager''' 2900QueueManager2.register('get_queue') 2901 2902 2903SERIALIZER = 'xmlrpclib' 2904 2905class _TestRemoteManager(BaseTestCase): 2906 2907 ALLOWED_TYPES = ('manager',) 2908 values = ['hello world', None, True, 2.25, 2909 'hall\xe5 v\xe4rlden', 2910 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2911 b'hall\xe5 v\xe4rlden', 2912 ] 2913 result = values[:] 2914 2915 @classmethod 2916 def _putter(cls, address, authkey): 2917 manager = QueueManager2( 2918 address=address, authkey=authkey, serializer=SERIALIZER 2919 ) 2920 manager.connect() 2921 queue = manager.get_queue() 2922 # Note that xmlrpclib will deserialize object as a list not a tuple 2923 queue.put(tuple(cls.values)) 2924 2925 def test_remote(self): 2926 authkey = os.urandom(32) 2927 2928 manager = QueueManager( 2929 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 2930 ) 2931 manager.start() 2932 self.addCleanup(manager.shutdown) 2933 2934 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2935 p.daemon = True 2936 p.start() 2937 2938 manager2 = QueueManager2( 2939 address=manager.address, authkey=authkey, serializer=SERIALIZER 2940 ) 2941 manager2.connect() 2942 queue = manager2.get_queue() 2943 2944 self.assertEqual(queue.get(), self.result) 2945 2946 # Because we are using xmlrpclib for serialization instead of 2947 # pickle this will cause a serialization error. 2948 self.assertRaises(Exception, queue.put, time.sleep) 2949 2950 # Make queue finalizer run before the server is stopped 2951 del queue 2952 2953class _TestManagerRestart(BaseTestCase): 2954 2955 @classmethod 2956 def _putter(cls, address, authkey): 2957 manager = QueueManager( 2958 address=address, authkey=authkey, serializer=SERIALIZER) 2959 manager.connect() 2960 queue = manager.get_queue() 2961 queue.put('hello world') 2962 2963 def test_rapid_restart(self): 2964 authkey = os.urandom(32) 2965 manager = QueueManager( 2966 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2967 try: 2968 srvr = manager.get_server() 2969 addr = srvr.address 2970 # Close the connection.Listener socket which gets opened as a part 2971 # of manager.get_server(). It's not needed for the test. 2972 srvr.listener.close() 2973 manager.start() 2974 2975 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2976 p.start() 2977 p.join() 2978 queue = manager.get_queue() 2979 self.assertEqual(queue.get(), 'hello world') 2980 del queue 2981 finally: 2982 if hasattr(manager, "shutdown"): 2983 manager.shutdown() 2984 2985 manager = QueueManager( 2986 address=addr, authkey=authkey, serializer=SERIALIZER) 2987 try: 2988 manager.start() 2989 self.addCleanup(manager.shutdown) 2990 except OSError as e: 2991 if e.errno != errno.EADDRINUSE: 2992 raise 2993 # Retry after some time, in case the old socket was lingering 2994 # (sporadic failure on buildbots) 2995 time.sleep(1.0) 2996 manager = QueueManager( 2997 address=addr, authkey=authkey, serializer=SERIALIZER) 2998 if hasattr(manager, "shutdown"): 2999 self.addCleanup(manager.shutdown) 3000 3001# 3002# 3003# 3004 3005SENTINEL = latin('') 3006 3007class _TestConnection(BaseTestCase): 3008 3009 ALLOWED_TYPES = ('processes', 'threads') 3010 3011 @classmethod 3012 def _echo(cls, conn): 3013 for msg in iter(conn.recv_bytes, SENTINEL): 3014 conn.send_bytes(msg) 3015 conn.close() 3016 3017 def test_connection(self): 3018 conn, child_conn = self.Pipe() 3019 3020 p = self.Process(target=self._echo, args=(child_conn,)) 3021 p.daemon = True 3022 p.start() 3023 3024 seq = [1, 2.25, None] 3025 msg = latin('hello world') 3026 longmsg = msg * 10 3027 arr = array.array('i', list(range(4))) 3028 3029 if self.TYPE == 'processes': 3030 self.assertEqual(type(conn.fileno()), int) 3031 3032 self.assertEqual(conn.send(seq), None) 3033 self.assertEqual(conn.recv(), seq) 3034 3035 self.assertEqual(conn.send_bytes(msg), None) 3036 self.assertEqual(conn.recv_bytes(), msg) 3037 3038 if self.TYPE == 'processes': 3039 buffer = array.array('i', [0]*10) 3040 expected = list(arr) + [0] * (10 - len(arr)) 3041 self.assertEqual(conn.send_bytes(arr), None) 3042 self.assertEqual(conn.recv_bytes_into(buffer), 3043 len(arr) * buffer.itemsize) 3044 self.assertEqual(list(buffer), expected) 3045 3046 buffer = array.array('i', [0]*10) 3047 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3048 self.assertEqual(conn.send_bytes(arr), None) 3049 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3050 len(arr) * buffer.itemsize) 3051 self.assertEqual(list(buffer), expected) 3052 3053 buffer = bytearray(latin(' ' * 40)) 3054 self.assertEqual(conn.send_bytes(longmsg), None) 3055 try: 3056 res = conn.recv_bytes_into(buffer) 3057 except multiprocessing.BufferTooShort as e: 3058 self.assertEqual(e.args, (longmsg,)) 3059 else: 3060 self.fail('expected BufferTooShort, got %s' % res) 3061 3062 poll = TimingWrapper(conn.poll) 3063 3064 self.assertEqual(poll(), False) 3065 self.assertTimingAlmostEqual(poll.elapsed, 0) 3066 3067 self.assertEqual(poll(-1), False) 3068 self.assertTimingAlmostEqual(poll.elapsed, 0) 3069 3070 self.assertEqual(poll(TIMEOUT1), False) 3071 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3072 3073 conn.send(None) 3074 time.sleep(.1) 3075 3076 self.assertEqual(poll(TIMEOUT1), True) 3077 self.assertTimingAlmostEqual(poll.elapsed, 0) 3078 3079 self.assertEqual(conn.recv(), None) 3080 3081 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3082 conn.send_bytes(really_big_msg) 3083 self.assertEqual(conn.recv_bytes(), really_big_msg) 3084 3085 conn.send_bytes(SENTINEL) # tell child to quit 3086 child_conn.close() 3087 3088 if self.TYPE == 'processes': 3089 self.assertEqual(conn.readable, True) 3090 self.assertEqual(conn.writable, True) 3091 self.assertRaises(EOFError, conn.recv) 3092 self.assertRaises(EOFError, conn.recv_bytes) 3093 3094 p.join() 3095 3096 def test_duplex_false(self): 3097 reader, writer = self.Pipe(duplex=False) 3098 self.assertEqual(writer.send(1), None) 3099 self.assertEqual(reader.recv(), 1) 3100 if self.TYPE == 'processes': 3101 self.assertEqual(reader.readable, True) 3102 self.assertEqual(reader.writable, False) 3103 self.assertEqual(writer.readable, False) 3104 self.assertEqual(writer.writable, True) 3105 self.assertRaises(OSError, reader.send, 2) 3106 self.assertRaises(OSError, writer.recv) 3107 self.assertRaises(OSError, writer.poll) 3108 3109 def test_spawn_close(self): 3110 # We test that a pipe connection can be closed by parent 3111 # process immediately after child is spawned. On Windows this 3112 # would have sometimes failed on old versions because 3113 # child_conn would be closed before the child got a chance to 3114 # duplicate it. 3115 conn, child_conn = self.Pipe() 3116 3117 p = self.Process(target=self._echo, args=(child_conn,)) 3118 p.daemon = True 3119 p.start() 3120 child_conn.close() # this might complete before child initializes 3121 3122 msg = latin('hello') 3123 conn.send_bytes(msg) 3124 self.assertEqual(conn.recv_bytes(), msg) 3125 3126 conn.send_bytes(SENTINEL) 3127 conn.close() 3128 p.join() 3129 3130 def test_sendbytes(self): 3131 if self.TYPE != 'processes': 3132 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3133 3134 msg = latin('abcdefghijklmnopqrstuvwxyz') 3135 a, b = self.Pipe() 3136 3137 a.send_bytes(msg) 3138 self.assertEqual(b.recv_bytes(), msg) 3139 3140 a.send_bytes(msg, 5) 3141 self.assertEqual(b.recv_bytes(), msg[5:]) 3142 3143 a.send_bytes(msg, 7, 8) 3144 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3145 3146 a.send_bytes(msg, 26) 3147 self.assertEqual(b.recv_bytes(), latin('')) 3148 3149 a.send_bytes(msg, 26, 0) 3150 self.assertEqual(b.recv_bytes(), latin('')) 3151 3152 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3153 3154 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3155 3156 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3157 3158 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3159 3160 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3161 3162 @classmethod 3163 def _is_fd_assigned(cls, fd): 3164 try: 3165 os.fstat(fd) 3166 except OSError as e: 3167 if e.errno == errno.EBADF: 3168 return False 3169 raise 3170 else: 3171 return True 3172 3173 @classmethod 3174 def _writefd(cls, conn, data, create_dummy_fds=False): 3175 if create_dummy_fds: 3176 for i in range(0, 256): 3177 if not cls._is_fd_assigned(i): 3178 os.dup2(conn.fileno(), i) 3179 fd = reduction.recv_handle(conn) 3180 if msvcrt: 3181 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3182 os.write(fd, data) 3183 os.close(fd) 3184 3185 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3186 def test_fd_transfer(self): 3187 if self.TYPE != 'processes': 3188 self.skipTest("only makes sense with processes") 3189 conn, child_conn = self.Pipe(duplex=True) 3190 3191 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3192 p.daemon = True 3193 p.start() 3194 self.addCleanup(test.support.unlink, test.support.TESTFN) 3195 with open(test.support.TESTFN, "wb") as f: 3196 fd = f.fileno() 3197 if msvcrt: 3198 fd = msvcrt.get_osfhandle(fd) 3199 reduction.send_handle(conn, fd, p.pid) 3200 p.join() 3201 with open(test.support.TESTFN, "rb") as f: 3202 self.assertEqual(f.read(), b"foo") 3203 3204 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3205 @unittest.skipIf(sys.platform == "win32", 3206 "test semantics don't make sense on Windows") 3207 @unittest.skipIf(MAXFD <= 256, 3208 "largest assignable fd number is too small") 3209 @unittest.skipUnless(hasattr(os, "dup2"), 3210 "test needs os.dup2()") 3211 def test_large_fd_transfer(self): 3212 # With fd > 256 (issue #11657) 3213 if self.TYPE != 'processes': 3214 self.skipTest("only makes sense with processes") 3215 conn, child_conn = self.Pipe(duplex=True) 3216 3217 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3218 p.daemon = True 3219 p.start() 3220 self.addCleanup(test.support.unlink, test.support.TESTFN) 3221 with open(test.support.TESTFN, "wb") as f: 3222 fd = f.fileno() 3223 for newfd in range(256, MAXFD): 3224 if not self._is_fd_assigned(newfd): 3225 break 3226 else: 3227 self.fail("could not find an unassigned large file descriptor") 3228 os.dup2(fd, newfd) 3229 try: 3230 reduction.send_handle(conn, newfd, p.pid) 3231 finally: 3232 os.close(newfd) 3233 p.join() 3234 with open(test.support.TESTFN, "rb") as f: 3235 self.assertEqual(f.read(), b"bar") 3236 3237 @classmethod 3238 def _send_data_without_fd(self, conn): 3239 os.write(conn.fileno(), b"\0") 3240 3241 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3242 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3243 def test_missing_fd_transfer(self): 3244 # Check that exception is raised when received data is not 3245 # accompanied by a file descriptor in ancillary data. 3246 if self.TYPE != 'processes': 3247 self.skipTest("only makes sense with processes") 3248 conn, child_conn = self.Pipe(duplex=True) 3249 3250 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3251 p.daemon = True 3252 p.start() 3253 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3254 p.join() 3255 3256 def test_context(self): 3257 a, b = self.Pipe() 3258 3259 with a, b: 3260 a.send(1729) 3261 self.assertEqual(b.recv(), 1729) 3262 if self.TYPE == 'processes': 3263 self.assertFalse(a.closed) 3264 self.assertFalse(b.closed) 3265 3266 if self.TYPE == 'processes': 3267 self.assertTrue(a.closed) 3268 self.assertTrue(b.closed) 3269 self.assertRaises(OSError, a.recv) 3270 self.assertRaises(OSError, b.recv) 3271 3272class _TestListener(BaseTestCase): 3273 3274 ALLOWED_TYPES = ('processes',) 3275 3276 def test_multiple_bind(self): 3277 for family in self.connection.families: 3278 l = self.connection.Listener(family=family) 3279 self.addCleanup(l.close) 3280 self.assertRaises(OSError, self.connection.Listener, 3281 l.address, family) 3282 3283 def test_context(self): 3284 with self.connection.Listener() as l: 3285 with self.connection.Client(l.address) as c: 3286 with l.accept() as d: 3287 c.send(1729) 3288 self.assertEqual(d.recv(), 1729) 3289 3290 if self.TYPE == 'processes': 3291 self.assertRaises(OSError, l.accept) 3292 3293 @unittest.skipUnless(util.abstract_sockets_supported, 3294 "test needs abstract socket support") 3295 def test_abstract_socket(self): 3296 with self.connection.Listener("\0something") as listener: 3297 with self.connection.Client(listener.address) as client: 3298 with listener.accept() as d: 3299 client.send(1729) 3300 self.assertEqual(d.recv(), 1729) 3301 3302 if self.TYPE == 'processes': 3303 self.assertRaises(OSError, listener.accept) 3304 3305 3306class _TestListenerClient(BaseTestCase): 3307 3308 ALLOWED_TYPES = ('processes', 'threads') 3309 3310 @classmethod 3311 def _test(cls, address): 3312 conn = cls.connection.Client(address) 3313 conn.send('hello') 3314 conn.close() 3315 3316 def test_listener_client(self): 3317 for family in self.connection.families: 3318 l = self.connection.Listener(family=family) 3319 p = self.Process(target=self._test, args=(l.address,)) 3320 p.daemon = True 3321 p.start() 3322 conn = l.accept() 3323 self.assertEqual(conn.recv(), 'hello') 3324 p.join() 3325 l.close() 3326 3327 def test_issue14725(self): 3328 l = self.connection.Listener() 3329 p = self.Process(target=self._test, args=(l.address,)) 3330 p.daemon = True 3331 p.start() 3332 time.sleep(1) 3333 # On Windows the client process should by now have connected, 3334 # written data and closed the pipe handle by now. This causes 3335 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3336 # 14725. 3337 conn = l.accept() 3338 self.assertEqual(conn.recv(), 'hello') 3339 conn.close() 3340 p.join() 3341 l.close() 3342 3343 def test_issue16955(self): 3344 for fam in self.connection.families: 3345 l = self.connection.Listener(family=fam) 3346 c = self.connection.Client(l.address) 3347 a = l.accept() 3348 a.send_bytes(b"hello") 3349 self.assertTrue(c.poll(1)) 3350 a.close() 3351 c.close() 3352 l.close() 3353 3354class _TestPoll(BaseTestCase): 3355 3356 ALLOWED_TYPES = ('processes', 'threads') 3357 3358 def test_empty_string(self): 3359 a, b = self.Pipe() 3360 self.assertEqual(a.poll(), False) 3361 b.send_bytes(b'') 3362 self.assertEqual(a.poll(), True) 3363 self.assertEqual(a.poll(), True) 3364 3365 @classmethod 3366 def _child_strings(cls, conn, strings): 3367 for s in strings: 3368 time.sleep(0.1) 3369 conn.send_bytes(s) 3370 conn.close() 3371 3372 def test_strings(self): 3373 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3374 a, b = self.Pipe() 3375 p = self.Process(target=self._child_strings, args=(b, strings)) 3376 p.start() 3377 3378 for s in strings: 3379 for i in range(200): 3380 if a.poll(0.01): 3381 break 3382 x = a.recv_bytes() 3383 self.assertEqual(s, x) 3384 3385 p.join() 3386 3387 @classmethod 3388 def _child_boundaries(cls, r): 3389 # Polling may "pull" a message in to the child process, but we 3390 # don't want it to pull only part of a message, as that would 3391 # corrupt the pipe for any other processes which might later 3392 # read from it. 3393 r.poll(5) 3394 3395 def test_boundaries(self): 3396 r, w = self.Pipe(False) 3397 p = self.Process(target=self._child_boundaries, args=(r,)) 3398 p.start() 3399 time.sleep(2) 3400 L = [b"first", b"second"] 3401 for obj in L: 3402 w.send_bytes(obj) 3403 w.close() 3404 p.join() 3405 self.assertIn(r.recv_bytes(), L) 3406 3407 @classmethod 3408 def _child_dont_merge(cls, b): 3409 b.send_bytes(b'a') 3410 b.send_bytes(b'b') 3411 b.send_bytes(b'cd') 3412 3413 def test_dont_merge(self): 3414 a, b = self.Pipe() 3415 self.assertEqual(a.poll(0.0), False) 3416 self.assertEqual(a.poll(0.1), False) 3417 3418 p = self.Process(target=self._child_dont_merge, args=(b,)) 3419 p.start() 3420 3421 self.assertEqual(a.recv_bytes(), b'a') 3422 self.assertEqual(a.poll(1.0), True) 3423 self.assertEqual(a.poll(1.0), True) 3424 self.assertEqual(a.recv_bytes(), b'b') 3425 self.assertEqual(a.poll(1.0), True) 3426 self.assertEqual(a.poll(1.0), True) 3427 self.assertEqual(a.poll(0.0), True) 3428 self.assertEqual(a.recv_bytes(), b'cd') 3429 3430 p.join() 3431 3432# 3433# Test of sending connection and socket objects between processes 3434# 3435 3436@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3437class _TestPicklingConnections(BaseTestCase): 3438 3439 ALLOWED_TYPES = ('processes',) 3440 3441 @classmethod 3442 def tearDownClass(cls): 3443 from multiprocessing import resource_sharer 3444 resource_sharer.stop(timeout=TIMEOUT) 3445 3446 @classmethod 3447 def _listener(cls, conn, families): 3448 for fam in families: 3449 l = cls.connection.Listener(family=fam) 3450 conn.send(l.address) 3451 new_conn = l.accept() 3452 conn.send(new_conn) 3453 new_conn.close() 3454 l.close() 3455 3456 l = socket.create_server((test.support.HOST, 0)) 3457 conn.send(l.getsockname()) 3458 new_conn, addr = l.accept() 3459 conn.send(new_conn) 3460 new_conn.close() 3461 l.close() 3462 3463 conn.recv() 3464 3465 @classmethod 3466 def _remote(cls, conn): 3467 for (address, msg) in iter(conn.recv, None): 3468 client = cls.connection.Client(address) 3469 client.send(msg.upper()) 3470 client.close() 3471 3472 address, msg = conn.recv() 3473 client = socket.socket() 3474 client.connect(address) 3475 client.sendall(msg.upper()) 3476 client.close() 3477 3478 conn.close() 3479 3480 def test_pickling(self): 3481 families = self.connection.families 3482 3483 lconn, lconn0 = self.Pipe() 3484 lp = self.Process(target=self._listener, args=(lconn0, families)) 3485 lp.daemon = True 3486 lp.start() 3487 lconn0.close() 3488 3489 rconn, rconn0 = self.Pipe() 3490 rp = self.Process(target=self._remote, args=(rconn0,)) 3491 rp.daemon = True 3492 rp.start() 3493 rconn0.close() 3494 3495 for fam in families: 3496 msg = ('This connection uses family %s' % fam).encode('ascii') 3497 address = lconn.recv() 3498 rconn.send((address, msg)) 3499 new_conn = lconn.recv() 3500 self.assertEqual(new_conn.recv(), msg.upper()) 3501 3502 rconn.send(None) 3503 3504 msg = latin('This connection uses a normal socket') 3505 address = lconn.recv() 3506 rconn.send((address, msg)) 3507 new_conn = lconn.recv() 3508 buf = [] 3509 while True: 3510 s = new_conn.recv(100) 3511 if not s: 3512 break 3513 buf.append(s) 3514 buf = b''.join(buf) 3515 self.assertEqual(buf, msg.upper()) 3516 new_conn.close() 3517 3518 lconn.send(None) 3519 3520 rconn.close() 3521 lconn.close() 3522 3523 lp.join() 3524 rp.join() 3525 3526 @classmethod 3527 def child_access(cls, conn): 3528 w = conn.recv() 3529 w.send('all is well') 3530 w.close() 3531 3532 r = conn.recv() 3533 msg = r.recv() 3534 conn.send(msg*2) 3535 3536 conn.close() 3537 3538 def test_access(self): 3539 # On Windows, if we do not specify a destination pid when 3540 # using DupHandle then we need to be careful to use the 3541 # correct access flags for DuplicateHandle(), or else 3542 # DupHandle.detach() will raise PermissionError. For example, 3543 # for a read only pipe handle we should use 3544 # access=FILE_GENERIC_READ. (Unfortunately 3545 # DUPLICATE_SAME_ACCESS does not work.) 3546 conn, child_conn = self.Pipe() 3547 p = self.Process(target=self.child_access, args=(child_conn,)) 3548 p.daemon = True 3549 p.start() 3550 child_conn.close() 3551 3552 r, w = self.Pipe(duplex=False) 3553 conn.send(w) 3554 w.close() 3555 self.assertEqual(r.recv(), 'all is well') 3556 r.close() 3557 3558 r, w = self.Pipe(duplex=False) 3559 conn.send(r) 3560 r.close() 3561 w.send('foobar') 3562 w.close() 3563 self.assertEqual(conn.recv(), 'foobar'*2) 3564 3565 p.join() 3566 3567# 3568# 3569# 3570 3571class _TestHeap(BaseTestCase): 3572 3573 ALLOWED_TYPES = ('processes',) 3574 3575 def setUp(self): 3576 super().setUp() 3577 # Make pristine heap for these tests 3578 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3579 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3580 3581 def tearDown(self): 3582 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3583 super().tearDown() 3584 3585 def test_heap(self): 3586 iterations = 5000 3587 maxblocks = 50 3588 blocks = [] 3589 3590 # get the heap object 3591 heap = multiprocessing.heap.BufferWrapper._heap 3592 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3593 3594 # create and destroy lots of blocks of different sizes 3595 for i in range(iterations): 3596 size = int(random.lognormvariate(0, 1) * 1000) 3597 b = multiprocessing.heap.BufferWrapper(size) 3598 blocks.append(b) 3599 if len(blocks) > maxblocks: 3600 i = random.randrange(maxblocks) 3601 del blocks[i] 3602 del b 3603 3604 # verify the state of the heap 3605 with heap._lock: 3606 all = [] 3607 free = 0 3608 occupied = 0 3609 for L in list(heap._len_to_seq.values()): 3610 # count all free blocks in arenas 3611 for arena, start, stop in L: 3612 all.append((heap._arenas.index(arena), start, stop, 3613 stop-start, 'free')) 3614 free += (stop-start) 3615 for arena, arena_blocks in heap._allocated_blocks.items(): 3616 # count all allocated blocks in arenas 3617 for start, stop in arena_blocks: 3618 all.append((heap._arenas.index(arena), start, stop, 3619 stop-start, 'occupied')) 3620 occupied += (stop-start) 3621 3622 self.assertEqual(free + occupied, 3623 sum(arena.size for arena in heap._arenas)) 3624 3625 all.sort() 3626 3627 for i in range(len(all)-1): 3628 (arena, start, stop) = all[i][:3] 3629 (narena, nstart, nstop) = all[i+1][:3] 3630 if arena != narena: 3631 # Two different arenas 3632 self.assertEqual(stop, heap._arenas[arena].size) # last block 3633 self.assertEqual(nstart, 0) # first block 3634 else: 3635 # Same arena: two adjacent blocks 3636 self.assertEqual(stop, nstart) 3637 3638 # test free'ing all blocks 3639 random.shuffle(blocks) 3640 while blocks: 3641 blocks.pop() 3642 3643 self.assertEqual(heap._n_frees, heap._n_mallocs) 3644 self.assertEqual(len(heap._pending_free_blocks), 0) 3645 self.assertEqual(len(heap._arenas), 0) 3646 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 3647 self.assertEqual(len(heap._len_to_seq), 0) 3648 3649 def test_free_from_gc(self): 3650 # Check that freeing of blocks by the garbage collector doesn't deadlock 3651 # (issue #12352). 3652 # Make sure the GC is enabled, and set lower collection thresholds to 3653 # make collections more frequent (and increase the probability of 3654 # deadlock). 3655 if not gc.isenabled(): 3656 gc.enable() 3657 self.addCleanup(gc.disable) 3658 thresholds = gc.get_threshold() 3659 self.addCleanup(gc.set_threshold, *thresholds) 3660 gc.set_threshold(10) 3661 3662 # perform numerous block allocations, with cyclic references to make 3663 # sure objects are collected asynchronously by the gc 3664 for i in range(5000): 3665 a = multiprocessing.heap.BufferWrapper(1) 3666 b = multiprocessing.heap.BufferWrapper(1) 3667 # circular references 3668 a.buddy = b 3669 b.buddy = a 3670 3671# 3672# 3673# 3674 3675class _Foo(Structure): 3676 _fields_ = [ 3677 ('x', c_int), 3678 ('y', c_double), 3679 ('z', c_longlong,) 3680 ] 3681 3682class _TestSharedCTypes(BaseTestCase): 3683 3684 ALLOWED_TYPES = ('processes',) 3685 3686 def setUp(self): 3687 if not HAS_SHAREDCTYPES: 3688 self.skipTest("requires multiprocessing.sharedctypes") 3689 3690 @classmethod 3691 def _double(cls, x, y, z, foo, arr, string): 3692 x.value *= 2 3693 y.value *= 2 3694 z.value *= 2 3695 foo.x *= 2 3696 foo.y *= 2 3697 string.value *= 2 3698 for i in range(len(arr)): 3699 arr[i] *= 2 3700 3701 def test_sharedctypes(self, lock=False): 3702 x = Value('i', 7, lock=lock) 3703 y = Value(c_double, 1.0/3.0, lock=lock) 3704 z = Value(c_longlong, 2 ** 33, lock=lock) 3705 foo = Value(_Foo, 3, 2, lock=lock) 3706 arr = self.Array('d', list(range(10)), lock=lock) 3707 string = self.Array('c', 20, lock=lock) 3708 string.value = latin('hello') 3709 3710 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3711 p.daemon = True 3712 p.start() 3713 p.join() 3714 3715 self.assertEqual(x.value, 14) 3716 self.assertAlmostEqual(y.value, 2.0/3.0) 3717 self.assertEqual(z.value, 2 ** 34) 3718 self.assertEqual(foo.x, 6) 3719 self.assertAlmostEqual(foo.y, 4.0) 3720 for i in range(10): 3721 self.assertAlmostEqual(arr[i], i*2) 3722 self.assertEqual(string.value, latin('hellohello')) 3723 3724 def test_synchronize(self): 3725 self.test_sharedctypes(lock=True) 3726 3727 def test_copy(self): 3728 foo = _Foo(2, 5.0, 2 ** 33) 3729 bar = copy(foo) 3730 foo.x = 0 3731 foo.y = 0 3732 foo.z = 0 3733 self.assertEqual(bar.x, 2) 3734 self.assertAlmostEqual(bar.y, 5.0) 3735 self.assertEqual(bar.z, 2 ** 33) 3736 3737 3738@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 3739class _TestSharedMemory(BaseTestCase): 3740 3741 ALLOWED_TYPES = ('processes',) 3742 3743 @staticmethod 3744 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 3745 if isinstance(shmem_name_or_obj, str): 3746 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 3747 else: 3748 local_sms = shmem_name_or_obj 3749 local_sms.buf[:len(binary_data)] = binary_data 3750 local_sms.close() 3751 3752 def test_shared_memory_basics(self): 3753 sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512) 3754 self.addCleanup(sms.unlink) 3755 3756 # Verify attributes are readable. 3757 self.assertEqual(sms.name, 'test01_tsmb') 3758 self.assertGreaterEqual(sms.size, 512) 3759 self.assertGreaterEqual(len(sms.buf), sms.size) 3760 3761 # Modify contents of shared memory segment through memoryview. 3762 sms.buf[0] = 42 3763 self.assertEqual(sms.buf[0], 42) 3764 3765 # Attach to existing shared memory segment. 3766 also_sms = shared_memory.SharedMemory('test01_tsmb') 3767 self.assertEqual(also_sms.buf[0], 42) 3768 also_sms.close() 3769 3770 # Attach to existing shared memory segment but specify a new size. 3771 same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size) 3772 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 3773 same_sms.close() 3774 3775 if shared_memory._USE_POSIX: 3776 # Posix Shared Memory can only be unlinked once. Here we 3777 # test an implementation detail that is not observed across 3778 # all supported platforms (since WindowsNamedSharedMemory 3779 # manages unlinking on its own and unlink() does nothing). 3780 # True release of shared memory segment does not necessarily 3781 # happen until process exits, depending on the OS platform. 3782 with self.assertRaises(FileNotFoundError): 3783 sms_uno = shared_memory.SharedMemory( 3784 'test01_dblunlink', 3785 create=True, 3786 size=5000 3787 ) 3788 3789 try: 3790 self.assertGreaterEqual(sms_uno.size, 5000) 3791 3792 sms_duo = shared_memory.SharedMemory('test01_dblunlink') 3793 sms_duo.unlink() # First shm_unlink() call. 3794 sms_duo.close() 3795 sms_uno.close() 3796 3797 finally: 3798 sms_uno.unlink() # A second shm_unlink() call is bad. 3799 3800 with self.assertRaises(FileExistsError): 3801 # Attempting to create a new shared memory segment with a 3802 # name that is already in use triggers an exception. 3803 there_can_only_be_one_sms = shared_memory.SharedMemory( 3804 'test01_tsmb', 3805 create=True, 3806 size=512 3807 ) 3808 3809 if shared_memory._USE_POSIX: 3810 # Requesting creation of a shared memory segment with the option 3811 # to attach to an existing segment, if that name is currently in 3812 # use, should not trigger an exception. 3813 # Note: Using a smaller size could possibly cause truncation of 3814 # the existing segment but is OS platform dependent. In the 3815 # case of MacOS/darwin, requesting a smaller size is disallowed. 3816 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 3817 _flags = os.O_CREAT | os.O_RDWR 3818 ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb') 3819 self.assertEqual(ok_if_exists_sms.size, sms.size) 3820 ok_if_exists_sms.close() 3821 3822 # Attempting to attach to an existing shared memory segment when 3823 # no segment exists with the supplied name triggers an exception. 3824 with self.assertRaises(FileNotFoundError): 3825 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 3826 nonexisting_sms.unlink() # Error should occur on prior line. 3827 3828 sms.close() 3829 3830 def test_shared_memory_across_processes(self): 3831 # bpo-40135: don't define shared memory block's name in case of 3832 # the failure when we run multiprocessing tests in parallel. 3833 sms = shared_memory.SharedMemory(create=True, size=512) 3834 self.addCleanup(sms.unlink) 3835 3836 # Verify remote attachment to existing block by name is working. 3837 p = self.Process( 3838 target=self._attach_existing_shmem_then_write, 3839 args=(sms.name, b'howdy') 3840 ) 3841 p.daemon = True 3842 p.start() 3843 p.join() 3844 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 3845 3846 # Verify pickling of SharedMemory instance also works. 3847 p = self.Process( 3848 target=self._attach_existing_shmem_then_write, 3849 args=(sms, b'HELLO') 3850 ) 3851 p.daemon = True 3852 p.start() 3853 p.join() 3854 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 3855 3856 sms.close() 3857 3858 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 3859 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 3860 # bpo-36368: protect SharedMemoryManager server process from 3861 # KeyboardInterrupt signals. 3862 smm = multiprocessing.managers.SharedMemoryManager() 3863 smm.start() 3864 3865 # make sure the manager works properly at the beginning 3866 sl = smm.ShareableList(range(10)) 3867 3868 # the manager's server should ignore KeyboardInterrupt signals, and 3869 # maintain its connection with the current process, and success when 3870 # asked to deliver memory segments. 3871 os.kill(smm._process.pid, signal.SIGINT) 3872 3873 sl2 = smm.ShareableList(range(10)) 3874 3875 # test that the custom signal handler registered in the Manager does 3876 # not affect signal handling in the parent process. 3877 with self.assertRaises(KeyboardInterrupt): 3878 os.kill(os.getpid(), signal.SIGINT) 3879 3880 smm.shutdown() 3881 3882 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 3883 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 3884 # bpo-36867: test that a SharedMemoryManager uses the 3885 # same resource_tracker process as its parent. 3886 cmd = '''if 1: 3887 from multiprocessing.managers import SharedMemoryManager 3888 3889 3890 smm = SharedMemoryManager() 3891 smm.start() 3892 sl = smm.ShareableList(range(10)) 3893 smm.shutdown() 3894 ''' 3895 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 3896 3897 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 3898 # resource_tracker process as its parent would make the parent's 3899 # tracker complain about sl being leaked even though smm.shutdown() 3900 # properly released sl. 3901 self.assertFalse(err) 3902 3903 def test_shared_memory_SharedMemoryManager_basics(self): 3904 smm1 = multiprocessing.managers.SharedMemoryManager() 3905 with self.assertRaises(ValueError): 3906 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 3907 smm1.start() 3908 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 3909 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 3910 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 3911 self.assertEqual(len(doppleganger_list0), 5) 3912 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 3913 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 3914 held_name = lom[0].name 3915 smm1.shutdown() 3916 if sys.platform != "win32": 3917 # Calls to unlink() have no effect on Windows platform; shared 3918 # memory will only be released once final process exits. 3919 with self.assertRaises(FileNotFoundError): 3920 # No longer there to be attached to again. 3921 absent_shm = shared_memory.SharedMemory(name=held_name) 3922 3923 with multiprocessing.managers.SharedMemoryManager() as smm2: 3924 sl = smm2.ShareableList("howdy") 3925 shm = smm2.SharedMemory(size=128) 3926 held_name = sl.shm.name 3927 if sys.platform != "win32": 3928 with self.assertRaises(FileNotFoundError): 3929 # No longer there to be attached to again. 3930 absent_sl = shared_memory.ShareableList(name=held_name) 3931 3932 3933 def test_shared_memory_ShareableList_basics(self): 3934 sl = shared_memory.ShareableList( 3935 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 3936 ) 3937 self.addCleanup(sl.shm.unlink) 3938 3939 # Verify attributes are readable. 3940 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 3941 3942 # Exercise len(). 3943 self.assertEqual(len(sl), 7) 3944 3945 # Exercise index(). 3946 with warnings.catch_warnings(): 3947 # Suppress BytesWarning when comparing against b'HoWdY'. 3948 warnings.simplefilter('ignore') 3949 with self.assertRaises(ValueError): 3950 sl.index('100') 3951 self.assertEqual(sl.index(100), 3) 3952 3953 # Exercise retrieving individual values. 3954 self.assertEqual(sl[0], 'howdy') 3955 self.assertEqual(sl[-2], True) 3956 3957 # Exercise iterability. 3958 self.assertEqual( 3959 tuple(sl), 3960 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 3961 ) 3962 3963 # Exercise modifying individual values. 3964 sl[3] = 42 3965 self.assertEqual(sl[3], 42) 3966 sl[4] = 'some' # Change type at a given position. 3967 self.assertEqual(sl[4], 'some') 3968 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 3969 with self.assertRaisesRegex(ValueError, 3970 "exceeds available storage"): 3971 sl[4] = 'far too many' 3972 self.assertEqual(sl[4], 'some') 3973 sl[0] = 'encodés' # Exactly 8 bytes of UTF-8 data 3974 self.assertEqual(sl[0], 'encodés') 3975 self.assertEqual(sl[1], b'HoWdY') # no spillage 3976 with self.assertRaisesRegex(ValueError, 3977 "exceeds available storage"): 3978 sl[0] = 'encodées' # Exactly 9 bytes of UTF-8 data 3979 self.assertEqual(sl[1], b'HoWdY') 3980 with self.assertRaisesRegex(ValueError, 3981 "exceeds available storage"): 3982 sl[1] = b'123456789' 3983 self.assertEqual(sl[1], b'HoWdY') 3984 3985 # Exercise count(). 3986 with warnings.catch_warnings(): 3987 # Suppress BytesWarning when comparing against b'HoWdY'. 3988 warnings.simplefilter('ignore') 3989 self.assertEqual(sl.count(42), 2) 3990 self.assertEqual(sl.count(b'HoWdY'), 1) 3991 self.assertEqual(sl.count(b'adios'), 0) 3992 3993 # Exercise creating a duplicate. 3994 sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate') 3995 try: 3996 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 3997 self.assertEqual('test03_duplicate', sl_copy.shm.name) 3998 self.assertEqual(list(sl), list(sl_copy)) 3999 self.assertEqual(sl.format, sl_copy.format) 4000 sl_copy[-1] = 77 4001 self.assertEqual(sl_copy[-1], 77) 4002 self.assertNotEqual(sl[-1], 77) 4003 sl_copy.shm.close() 4004 finally: 4005 sl_copy.shm.unlink() 4006 4007 # Obtain a second handle on the same ShareableList. 4008 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 4009 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 4010 sl_tethered[-1] = 880 4011 self.assertEqual(sl[-1], 880) 4012 sl_tethered.shm.close() 4013 4014 sl.shm.close() 4015 4016 # Exercise creating an empty ShareableList. 4017 empty_sl = shared_memory.ShareableList() 4018 try: 4019 self.assertEqual(len(empty_sl), 0) 4020 self.assertEqual(empty_sl.format, '') 4021 self.assertEqual(empty_sl.count('any'), 0) 4022 with self.assertRaises(ValueError): 4023 empty_sl.index(None) 4024 empty_sl.shm.close() 4025 finally: 4026 empty_sl.shm.unlink() 4027 4028 def test_shared_memory_ShareableList_pickling(self): 4029 sl = shared_memory.ShareableList(range(10)) 4030 self.addCleanup(sl.shm.unlink) 4031 4032 serialized_sl = pickle.dumps(sl) 4033 deserialized_sl = pickle.loads(serialized_sl) 4034 self.assertTrue( 4035 isinstance(deserialized_sl, shared_memory.ShareableList) 4036 ) 4037 self.assertTrue(deserialized_sl[-1], 9) 4038 self.assertFalse(sl is deserialized_sl) 4039 deserialized_sl[4] = "changed" 4040 self.assertEqual(sl[4], "changed") 4041 4042 # Verify data is not being put into the pickled representation. 4043 name = 'a' * len(sl.shm.name) 4044 larger_sl = shared_memory.ShareableList(range(400)) 4045 self.addCleanup(larger_sl.shm.unlink) 4046 serialized_larger_sl = pickle.dumps(larger_sl) 4047 self.assertTrue(len(serialized_sl) == len(serialized_larger_sl)) 4048 larger_sl.shm.close() 4049 4050 deserialized_sl.shm.close() 4051 sl.shm.close() 4052 4053 def test_shared_memory_cleaned_after_process_termination(self): 4054 cmd = '''if 1: 4055 import os, time, sys 4056 from multiprocessing import shared_memory 4057 4058 # Create a shared_memory segment, and send the segment name 4059 sm = shared_memory.SharedMemory(create=True, size=10) 4060 sys.stdout.write(sm.name + '\\n') 4061 sys.stdout.flush() 4062 time.sleep(100) 4063 ''' 4064 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4065 stdout=subprocess.PIPE, 4066 stderr=subprocess.PIPE) as p: 4067 name = p.stdout.readline().strip().decode() 4068 4069 # killing abruptly processes holding reference to a shared memory 4070 # segment should not leak the given memory segment. 4071 p.terminate() 4072 p.wait() 4073 4074 deadline = time.monotonic() + 60 4075 t = 0.1 4076 while time.monotonic() < deadline: 4077 time.sleep(t) 4078 t = min(t*2, 5) 4079 try: 4080 smm = shared_memory.SharedMemory(name, create=False) 4081 except FileNotFoundError: 4082 break 4083 else: 4084 raise AssertionError("A SharedMemory segment was leaked after" 4085 " a process was abruptly terminated.") 4086 4087 if os.name == 'posix': 4088 # A warning was emitted by the subprocess' own 4089 # resource_tracker (on Windows, shared memory segments 4090 # are released automatically by the OS). 4091 err = p.stderr.read().decode() 4092 self.assertIn( 4093 "resource_tracker: There appear to be 1 leaked " 4094 "shared_memory objects to clean up at shutdown", err) 4095 4096# 4097# 4098# 4099 4100class _TestFinalize(BaseTestCase): 4101 4102 ALLOWED_TYPES = ('processes',) 4103 4104 def setUp(self): 4105 self.registry_backup = util._finalizer_registry.copy() 4106 util._finalizer_registry.clear() 4107 4108 def tearDown(self): 4109 self.assertFalse(util._finalizer_registry) 4110 util._finalizer_registry.update(self.registry_backup) 4111 4112 @classmethod 4113 def _test_finalize(cls, conn): 4114 class Foo(object): 4115 pass 4116 4117 a = Foo() 4118 util.Finalize(a, conn.send, args=('a',)) 4119 del a # triggers callback for a 4120 4121 b = Foo() 4122 close_b = util.Finalize(b, conn.send, args=('b',)) 4123 close_b() # triggers callback for b 4124 close_b() # does nothing because callback has already been called 4125 del b # does nothing because callback has already been called 4126 4127 c = Foo() 4128 util.Finalize(c, conn.send, args=('c',)) 4129 4130 d10 = Foo() 4131 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4132 4133 d01 = Foo() 4134 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4135 d02 = Foo() 4136 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4137 d03 = Foo() 4138 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4139 4140 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4141 4142 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4143 4144 # call multiprocessing's cleanup function then exit process without 4145 # garbage collecting locals 4146 util._exit_function() 4147 conn.close() 4148 os._exit(0) 4149 4150 def test_finalize(self): 4151 conn, child_conn = self.Pipe() 4152 4153 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4154 p.daemon = True 4155 p.start() 4156 p.join() 4157 4158 result = [obj for obj in iter(conn.recv, 'STOP')] 4159 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4160 4161 def test_thread_safety(self): 4162 # bpo-24484: _run_finalizers() should be thread-safe 4163 def cb(): 4164 pass 4165 4166 class Foo(object): 4167 def __init__(self): 4168 self.ref = self # create reference cycle 4169 # insert finalizer at random key 4170 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4171 4172 finish = False 4173 exc = None 4174 4175 def run_finalizers(): 4176 nonlocal exc 4177 while not finish: 4178 time.sleep(random.random() * 1e-1) 4179 try: 4180 # A GC run will eventually happen during this, 4181 # collecting stale Foo's and mutating the registry 4182 util._run_finalizers() 4183 except Exception as e: 4184 exc = e 4185 4186 def make_finalizers(): 4187 nonlocal exc 4188 d = {} 4189 while not finish: 4190 try: 4191 # Old Foo's get gradually replaced and later 4192 # collected by the GC (because of the cyclic ref) 4193 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4194 except Exception as e: 4195 exc = e 4196 d.clear() 4197 4198 old_interval = sys.getswitchinterval() 4199 old_threshold = gc.get_threshold() 4200 try: 4201 sys.setswitchinterval(1e-6) 4202 gc.set_threshold(5, 5, 5) 4203 threads = [threading.Thread(target=run_finalizers), 4204 threading.Thread(target=make_finalizers)] 4205 with test.support.start_threads(threads): 4206 time.sleep(4.0) # Wait a bit to trigger race condition 4207 finish = True 4208 if exc is not None: 4209 raise exc 4210 finally: 4211 sys.setswitchinterval(old_interval) 4212 gc.set_threshold(*old_threshold) 4213 gc.collect() # Collect remaining Foo's 4214 4215 4216# 4217# Test that from ... import * works for each module 4218# 4219 4220class _TestImportStar(unittest.TestCase): 4221 4222 def get_module_names(self): 4223 import glob 4224 folder = os.path.dirname(multiprocessing.__file__) 4225 pattern = os.path.join(glob.escape(folder), '*.py') 4226 files = glob.glob(pattern) 4227 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4228 modules = ['multiprocessing.' + m for m in modules] 4229 modules.remove('multiprocessing.__init__') 4230 modules.append('multiprocessing') 4231 return modules 4232 4233 def test_import(self): 4234 modules = self.get_module_names() 4235 if sys.platform == 'win32': 4236 modules.remove('multiprocessing.popen_fork') 4237 modules.remove('multiprocessing.popen_forkserver') 4238 modules.remove('multiprocessing.popen_spawn_posix') 4239 else: 4240 modules.remove('multiprocessing.popen_spawn_win32') 4241 if not HAS_REDUCTION: 4242 modules.remove('multiprocessing.popen_forkserver') 4243 4244 if c_int is None: 4245 # This module requires _ctypes 4246 modules.remove('multiprocessing.sharedctypes') 4247 4248 for name in modules: 4249 __import__(name) 4250 mod = sys.modules[name] 4251 self.assertTrue(hasattr(mod, '__all__'), name) 4252 4253 for attr in mod.__all__: 4254 self.assertTrue( 4255 hasattr(mod, attr), 4256 '%r does not have attribute %r' % (mod, attr) 4257 ) 4258 4259# 4260# Quick test that logging works -- does not test logging output 4261# 4262 4263class _TestLogging(BaseTestCase): 4264 4265 ALLOWED_TYPES = ('processes',) 4266 4267 def test_enable_logging(self): 4268 logger = multiprocessing.get_logger() 4269 logger.setLevel(util.SUBWARNING) 4270 self.assertTrue(logger is not None) 4271 logger.debug('this will not be printed') 4272 logger.info('nor will this') 4273 logger.setLevel(LOG_LEVEL) 4274 4275 @classmethod 4276 def _test_level(cls, conn): 4277 logger = multiprocessing.get_logger() 4278 conn.send(logger.getEffectiveLevel()) 4279 4280 def test_level(self): 4281 LEVEL1 = 32 4282 LEVEL2 = 37 4283 4284 logger = multiprocessing.get_logger() 4285 root_logger = logging.getLogger() 4286 root_level = root_logger.level 4287 4288 reader, writer = multiprocessing.Pipe(duplex=False) 4289 4290 logger.setLevel(LEVEL1) 4291 p = self.Process(target=self._test_level, args=(writer,)) 4292 p.start() 4293 self.assertEqual(LEVEL1, reader.recv()) 4294 p.join() 4295 p.close() 4296 4297 logger.setLevel(logging.NOTSET) 4298 root_logger.setLevel(LEVEL2) 4299 p = self.Process(target=self._test_level, args=(writer,)) 4300 p.start() 4301 self.assertEqual(LEVEL2, reader.recv()) 4302 p.join() 4303 p.close() 4304 4305 root_logger.setLevel(root_level) 4306 logger.setLevel(level=LOG_LEVEL) 4307 4308 4309# class _TestLoggingProcessName(BaseTestCase): 4310# 4311# def handle(self, record): 4312# assert record.processName == multiprocessing.current_process().name 4313# self.__handled = True 4314# 4315# def test_logging(self): 4316# handler = logging.Handler() 4317# handler.handle = self.handle 4318# self.__handled = False 4319# # Bypass getLogger() and side-effects 4320# logger = logging.getLoggerClass()( 4321# 'multiprocessing.test.TestLoggingProcessName') 4322# logger.addHandler(handler) 4323# logger.propagate = False 4324# 4325# logger.warn('foo') 4326# assert self.__handled 4327 4328# 4329# Check that Process.join() retries if os.waitpid() fails with EINTR 4330# 4331 4332class _TestPollEintr(BaseTestCase): 4333 4334 ALLOWED_TYPES = ('processes',) 4335 4336 @classmethod 4337 def _killer(cls, pid): 4338 time.sleep(0.1) 4339 os.kill(pid, signal.SIGUSR1) 4340 4341 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4342 def test_poll_eintr(self): 4343 got_signal = [False] 4344 def record(*args): 4345 got_signal[0] = True 4346 pid = os.getpid() 4347 oldhandler = signal.signal(signal.SIGUSR1, record) 4348 try: 4349 killer = self.Process(target=self._killer, args=(pid,)) 4350 killer.start() 4351 try: 4352 p = self.Process(target=time.sleep, args=(2,)) 4353 p.start() 4354 p.join() 4355 finally: 4356 killer.join() 4357 self.assertTrue(got_signal[0]) 4358 self.assertEqual(p.exitcode, 0) 4359 finally: 4360 signal.signal(signal.SIGUSR1, oldhandler) 4361 4362# 4363# Test to verify handle verification, see issue 3321 4364# 4365 4366class TestInvalidHandle(unittest.TestCase): 4367 4368 @unittest.skipIf(WIN32, "skipped on Windows") 4369 def test_invalid_handles(self): 4370 conn = multiprocessing.connection.Connection(44977608) 4371 # check that poll() doesn't crash 4372 try: 4373 conn.poll() 4374 except (ValueError, OSError): 4375 pass 4376 finally: 4377 # Hack private attribute _handle to avoid printing an error 4378 # in conn.__del__ 4379 conn._handle = None 4380 self.assertRaises((ValueError, OSError), 4381 multiprocessing.connection.Connection, -1) 4382 4383 4384 4385class OtherTest(unittest.TestCase): 4386 # TODO: add more tests for deliver/answer challenge. 4387 def test_deliver_challenge_auth_failure(self): 4388 class _FakeConnection(object): 4389 def recv_bytes(self, size): 4390 return b'something bogus' 4391 def send_bytes(self, data): 4392 pass 4393 self.assertRaises(multiprocessing.AuthenticationError, 4394 multiprocessing.connection.deliver_challenge, 4395 _FakeConnection(), b'abc') 4396 4397 def test_answer_challenge_auth_failure(self): 4398 class _FakeConnection(object): 4399 def __init__(self): 4400 self.count = 0 4401 def recv_bytes(self, size): 4402 self.count += 1 4403 if self.count == 1: 4404 return multiprocessing.connection.CHALLENGE 4405 elif self.count == 2: 4406 return b'something bogus' 4407 return b'' 4408 def send_bytes(self, data): 4409 pass 4410 self.assertRaises(multiprocessing.AuthenticationError, 4411 multiprocessing.connection.answer_challenge, 4412 _FakeConnection(), b'abc') 4413 4414# 4415# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 4416# 4417 4418def initializer(ns): 4419 ns.test += 1 4420 4421class TestInitializers(unittest.TestCase): 4422 def setUp(self): 4423 self.mgr = multiprocessing.Manager() 4424 self.ns = self.mgr.Namespace() 4425 self.ns.test = 0 4426 4427 def tearDown(self): 4428 self.mgr.shutdown() 4429 self.mgr.join() 4430 4431 def test_manager_initializer(self): 4432 m = multiprocessing.managers.SyncManager() 4433 self.assertRaises(TypeError, m.start, 1) 4434 m.start(initializer, (self.ns,)) 4435 self.assertEqual(self.ns.test, 1) 4436 m.shutdown() 4437 m.join() 4438 4439 def test_pool_initializer(self): 4440 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 4441 p = multiprocessing.Pool(1, initializer, (self.ns,)) 4442 p.close() 4443 p.join() 4444 self.assertEqual(self.ns.test, 1) 4445 4446# 4447# Issue 5155, 5313, 5331: Test process in processes 4448# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 4449# 4450 4451def _this_sub_process(q): 4452 try: 4453 item = q.get(block=False) 4454 except pyqueue.Empty: 4455 pass 4456 4457def _test_process(): 4458 queue = multiprocessing.Queue() 4459 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 4460 subProc.daemon = True 4461 subProc.start() 4462 subProc.join() 4463 4464def _afunc(x): 4465 return x*x 4466 4467def pool_in_process(): 4468 pool = multiprocessing.Pool(processes=4) 4469 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 4470 pool.close() 4471 pool.join() 4472 4473class _file_like(object): 4474 def __init__(self, delegate): 4475 self._delegate = delegate 4476 self._pid = None 4477 4478 @property 4479 def cache(self): 4480 pid = os.getpid() 4481 # There are no race conditions since fork keeps only the running thread 4482 if pid != self._pid: 4483 self._pid = pid 4484 self._cache = [] 4485 return self._cache 4486 4487 def write(self, data): 4488 self.cache.append(data) 4489 4490 def flush(self): 4491 self._delegate.write(''.join(self.cache)) 4492 self._cache = [] 4493 4494class TestStdinBadfiledescriptor(unittest.TestCase): 4495 4496 def test_queue_in_process(self): 4497 proc = multiprocessing.Process(target=_test_process) 4498 proc.start() 4499 proc.join() 4500 4501 def test_pool_in_process(self): 4502 p = multiprocessing.Process(target=pool_in_process) 4503 p.start() 4504 p.join() 4505 4506 def test_flushing(self): 4507 sio = io.StringIO() 4508 flike = _file_like(sio) 4509 flike.write('foo') 4510 proc = multiprocessing.Process(target=lambda: flike.flush()) 4511 flike.flush() 4512 assert sio.getvalue() == 'foo' 4513 4514 4515class TestWait(unittest.TestCase): 4516 4517 @classmethod 4518 def _child_test_wait(cls, w, slow): 4519 for i in range(10): 4520 if slow: 4521 time.sleep(random.random()*0.1) 4522 w.send((i, os.getpid())) 4523 w.close() 4524 4525 def test_wait(self, slow=False): 4526 from multiprocessing.connection import wait 4527 readers = [] 4528 procs = [] 4529 messages = [] 4530 4531 for i in range(4): 4532 r, w = multiprocessing.Pipe(duplex=False) 4533 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 4534 p.daemon = True 4535 p.start() 4536 w.close() 4537 readers.append(r) 4538 procs.append(p) 4539 self.addCleanup(p.join) 4540 4541 while readers: 4542 for r in wait(readers): 4543 try: 4544 msg = r.recv() 4545 except EOFError: 4546 readers.remove(r) 4547 r.close() 4548 else: 4549 messages.append(msg) 4550 4551 messages.sort() 4552 expected = sorted((i, p.pid) for i in range(10) for p in procs) 4553 self.assertEqual(messages, expected) 4554 4555 @classmethod 4556 def _child_test_wait_socket(cls, address, slow): 4557 s = socket.socket() 4558 s.connect(address) 4559 for i in range(10): 4560 if slow: 4561 time.sleep(random.random()*0.1) 4562 s.sendall(('%s\n' % i).encode('ascii')) 4563 s.close() 4564 4565 def test_wait_socket(self, slow=False): 4566 from multiprocessing.connection import wait 4567 l = socket.create_server((test.support.HOST, 0)) 4568 addr = l.getsockname() 4569 readers = [] 4570 procs = [] 4571 dic = {} 4572 4573 for i in range(4): 4574 p = multiprocessing.Process(target=self._child_test_wait_socket, 4575 args=(addr, slow)) 4576 p.daemon = True 4577 p.start() 4578 procs.append(p) 4579 self.addCleanup(p.join) 4580 4581 for i in range(4): 4582 r, _ = l.accept() 4583 readers.append(r) 4584 dic[r] = [] 4585 l.close() 4586 4587 while readers: 4588 for r in wait(readers): 4589 msg = r.recv(32) 4590 if not msg: 4591 readers.remove(r) 4592 r.close() 4593 else: 4594 dic[r].append(msg) 4595 4596 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4597 for v in dic.values(): 4598 self.assertEqual(b''.join(v), expected) 4599 4600 def test_wait_slow(self): 4601 self.test_wait(True) 4602 4603 def test_wait_socket_slow(self): 4604 self.test_wait_socket(True) 4605 4606 def test_wait_timeout(self): 4607 from multiprocessing.connection import wait 4608 4609 expected = 5 4610 a, b = multiprocessing.Pipe() 4611 4612 start = time.monotonic() 4613 res = wait([a, b], expected) 4614 delta = time.monotonic() - start 4615 4616 self.assertEqual(res, []) 4617 self.assertLess(delta, expected * 2) 4618 self.assertGreater(delta, expected * 0.5) 4619 4620 b.send(None) 4621 4622 start = time.monotonic() 4623 res = wait([a, b], 20) 4624 delta = time.monotonic() - start 4625 4626 self.assertEqual(res, [a]) 4627 self.assertLess(delta, 0.4) 4628 4629 @classmethod 4630 def signal_and_sleep(cls, sem, period): 4631 sem.release() 4632 time.sleep(period) 4633 4634 def test_wait_integer(self): 4635 from multiprocessing.connection import wait 4636 4637 expected = 3 4638 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4639 sem = multiprocessing.Semaphore(0) 4640 a, b = multiprocessing.Pipe() 4641 p = multiprocessing.Process(target=self.signal_and_sleep, 4642 args=(sem, expected)) 4643 4644 p.start() 4645 self.assertIsInstance(p.sentinel, int) 4646 self.assertTrue(sem.acquire(timeout=20)) 4647 4648 start = time.monotonic() 4649 res = wait([a, p.sentinel, b], expected + 20) 4650 delta = time.monotonic() - start 4651 4652 self.assertEqual(res, [p.sentinel]) 4653 self.assertLess(delta, expected + 2) 4654 self.assertGreater(delta, expected - 2) 4655 4656 a.send(None) 4657 4658 start = time.monotonic() 4659 res = wait([a, p.sentinel, b], 20) 4660 delta = time.monotonic() - start 4661 4662 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4663 self.assertLess(delta, 0.4) 4664 4665 b.send(None) 4666 4667 start = time.monotonic() 4668 res = wait([a, p.sentinel, b], 20) 4669 delta = time.monotonic() - start 4670 4671 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4672 self.assertLess(delta, 0.4) 4673 4674 p.terminate() 4675 p.join() 4676 4677 def test_neg_timeout(self): 4678 from multiprocessing.connection import wait 4679 a, b = multiprocessing.Pipe() 4680 t = time.monotonic() 4681 res = wait([a], timeout=-1) 4682 t = time.monotonic() - t 4683 self.assertEqual(res, []) 4684 self.assertLess(t, 1) 4685 a.close() 4686 b.close() 4687 4688# 4689# Issue 14151: Test invalid family on invalid environment 4690# 4691 4692class TestInvalidFamily(unittest.TestCase): 4693 4694 @unittest.skipIf(WIN32, "skipped on Windows") 4695 def test_invalid_family(self): 4696 with self.assertRaises(ValueError): 4697 multiprocessing.connection.Listener(r'\\.\test') 4698 4699 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4700 def test_invalid_family_win32(self): 4701 with self.assertRaises(ValueError): 4702 multiprocessing.connection.Listener('/var/test.pipe') 4703 4704# 4705# Issue 12098: check sys.flags of child matches that for parent 4706# 4707 4708class TestFlags(unittest.TestCase): 4709 @classmethod 4710 def run_in_grandchild(cls, conn): 4711 conn.send(tuple(sys.flags)) 4712 4713 @classmethod 4714 def run_in_child(cls): 4715 import json 4716 r, w = multiprocessing.Pipe(duplex=False) 4717 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4718 p.start() 4719 grandchild_flags = r.recv() 4720 p.join() 4721 r.close() 4722 w.close() 4723 flags = (tuple(sys.flags), grandchild_flags) 4724 print(json.dumps(flags)) 4725 4726 def test_flags(self): 4727 import json 4728 # start child process using unusual flags 4729 prog = ('from test._test_multiprocessing import TestFlags; ' + 4730 'TestFlags.run_in_child()') 4731 data = subprocess.check_output( 4732 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4733 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4734 self.assertEqual(child_flags, grandchild_flags) 4735 4736# 4737# Test interaction with socket timeouts - see Issue #6056 4738# 4739 4740class TestTimeouts(unittest.TestCase): 4741 @classmethod 4742 def _test_timeout(cls, child, address): 4743 time.sleep(1) 4744 child.send(123) 4745 child.close() 4746 conn = multiprocessing.connection.Client(address) 4747 conn.send(456) 4748 conn.close() 4749 4750 def test_timeout(self): 4751 old_timeout = socket.getdefaulttimeout() 4752 try: 4753 socket.setdefaulttimeout(0.1) 4754 parent, child = multiprocessing.Pipe(duplex=True) 4755 l = multiprocessing.connection.Listener(family='AF_INET') 4756 p = multiprocessing.Process(target=self._test_timeout, 4757 args=(child, l.address)) 4758 p.start() 4759 child.close() 4760 self.assertEqual(parent.recv(), 123) 4761 parent.close() 4762 conn = l.accept() 4763 self.assertEqual(conn.recv(), 456) 4764 conn.close() 4765 l.close() 4766 join_process(p) 4767 finally: 4768 socket.setdefaulttimeout(old_timeout) 4769 4770# 4771# Test what happens with no "if __name__ == '__main__'" 4772# 4773 4774class TestNoForkBomb(unittest.TestCase): 4775 def test_noforkbomb(self): 4776 sm = multiprocessing.get_start_method() 4777 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 4778 if sm != 'fork': 4779 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 4780 self.assertEqual(out, b'') 4781 self.assertIn(b'RuntimeError', err) 4782 else: 4783 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 4784 self.assertEqual(out.rstrip(), b'123') 4785 self.assertEqual(err, b'') 4786 4787# 4788# Issue #17555: ForkAwareThreadLock 4789# 4790 4791class TestForkAwareThreadLock(unittest.TestCase): 4792 # We recursively start processes. Issue #17555 meant that the 4793 # after fork registry would get duplicate entries for the same 4794 # lock. The size of the registry at generation n was ~2**n. 4795 4796 @classmethod 4797 def child(cls, n, conn): 4798 if n > 1: 4799 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 4800 p.start() 4801 conn.close() 4802 join_process(p) 4803 else: 4804 conn.send(len(util._afterfork_registry)) 4805 conn.close() 4806 4807 def test_lock(self): 4808 r, w = multiprocessing.Pipe(False) 4809 l = util.ForkAwareThreadLock() 4810 old_size = len(util._afterfork_registry) 4811 p = multiprocessing.Process(target=self.child, args=(5, w)) 4812 p.start() 4813 w.close() 4814 new_size = r.recv() 4815 join_process(p) 4816 self.assertLessEqual(new_size, old_size) 4817 4818# 4819# Check that non-forked child processes do not inherit unneeded fds/handles 4820# 4821 4822class TestCloseFds(unittest.TestCase): 4823 4824 def get_high_socket_fd(self): 4825 if WIN32: 4826 # The child process will not have any socket handles, so 4827 # calling socket.fromfd() should produce WSAENOTSOCK even 4828 # if there is a handle of the same number. 4829 return socket.socket().detach() 4830 else: 4831 # We want to produce a socket with an fd high enough that a 4832 # freshly created child process will not have any fds as high. 4833 fd = socket.socket().detach() 4834 to_close = [] 4835 while fd < 50: 4836 to_close.append(fd) 4837 fd = os.dup(fd) 4838 for x in to_close: 4839 os.close(x) 4840 return fd 4841 4842 def close(self, fd): 4843 if WIN32: 4844 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 4845 else: 4846 os.close(fd) 4847 4848 @classmethod 4849 def _test_closefds(cls, conn, fd): 4850 try: 4851 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 4852 except Exception as e: 4853 conn.send(e) 4854 else: 4855 s.close() 4856 conn.send(None) 4857 4858 def test_closefd(self): 4859 if not HAS_REDUCTION: 4860 raise unittest.SkipTest('requires fd pickling') 4861 4862 reader, writer = multiprocessing.Pipe() 4863 fd = self.get_high_socket_fd() 4864 try: 4865 p = multiprocessing.Process(target=self._test_closefds, 4866 args=(writer, fd)) 4867 p.start() 4868 writer.close() 4869 e = reader.recv() 4870 join_process(p) 4871 finally: 4872 self.close(fd) 4873 writer.close() 4874 reader.close() 4875 4876 if multiprocessing.get_start_method() == 'fork': 4877 self.assertIs(e, None) 4878 else: 4879 WSAENOTSOCK = 10038 4880 self.assertIsInstance(e, OSError) 4881 self.assertTrue(e.errno == errno.EBADF or 4882 e.winerror == WSAENOTSOCK, e) 4883 4884# 4885# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 4886# 4887 4888class TestIgnoreEINTR(unittest.TestCase): 4889 4890 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 4891 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 4892 4893 @classmethod 4894 def _test_ignore(cls, conn): 4895 def handler(signum, frame): 4896 pass 4897 signal.signal(signal.SIGUSR1, handler) 4898 conn.send('ready') 4899 x = conn.recv() 4900 conn.send(x) 4901 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 4902 4903 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4904 def test_ignore(self): 4905 conn, child_conn = multiprocessing.Pipe() 4906 try: 4907 p = multiprocessing.Process(target=self._test_ignore, 4908 args=(child_conn,)) 4909 p.daemon = True 4910 p.start() 4911 child_conn.close() 4912 self.assertEqual(conn.recv(), 'ready') 4913 time.sleep(0.1) 4914 os.kill(p.pid, signal.SIGUSR1) 4915 time.sleep(0.1) 4916 conn.send(1234) 4917 self.assertEqual(conn.recv(), 1234) 4918 time.sleep(0.1) 4919 os.kill(p.pid, signal.SIGUSR1) 4920 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 4921 time.sleep(0.1) 4922 p.join() 4923 finally: 4924 conn.close() 4925 4926 @classmethod 4927 def _test_ignore_listener(cls, conn): 4928 def handler(signum, frame): 4929 pass 4930 signal.signal(signal.SIGUSR1, handler) 4931 with multiprocessing.connection.Listener() as l: 4932 conn.send(l.address) 4933 a = l.accept() 4934 a.send('welcome') 4935 4936 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4937 def test_ignore_listener(self): 4938 conn, child_conn = multiprocessing.Pipe() 4939 try: 4940 p = multiprocessing.Process(target=self._test_ignore_listener, 4941 args=(child_conn,)) 4942 p.daemon = True 4943 p.start() 4944 child_conn.close() 4945 address = conn.recv() 4946 time.sleep(0.1) 4947 os.kill(p.pid, signal.SIGUSR1) 4948 time.sleep(0.1) 4949 client = multiprocessing.connection.Client(address) 4950 self.assertEqual(client.recv(), 'welcome') 4951 p.join() 4952 finally: 4953 conn.close() 4954 4955class TestStartMethod(unittest.TestCase): 4956 @classmethod 4957 def _check_context(cls, conn): 4958 conn.send(multiprocessing.get_start_method()) 4959 4960 def check_context(self, ctx): 4961 r, w = ctx.Pipe(duplex=False) 4962 p = ctx.Process(target=self._check_context, args=(w,)) 4963 p.start() 4964 w.close() 4965 child_method = r.recv() 4966 r.close() 4967 p.join() 4968 self.assertEqual(child_method, ctx.get_start_method()) 4969 4970 def test_context(self): 4971 for method in ('fork', 'spawn', 'forkserver'): 4972 try: 4973 ctx = multiprocessing.get_context(method) 4974 except ValueError: 4975 continue 4976 self.assertEqual(ctx.get_start_method(), method) 4977 self.assertIs(ctx.get_context(), ctx) 4978 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 4979 self.assertRaises(ValueError, ctx.set_start_method, None) 4980 self.check_context(ctx) 4981 4982 def test_set_get(self): 4983 multiprocessing.set_forkserver_preload(PRELOAD) 4984 count = 0 4985 old_method = multiprocessing.get_start_method() 4986 try: 4987 for method in ('fork', 'spawn', 'forkserver'): 4988 try: 4989 multiprocessing.set_start_method(method, force=True) 4990 except ValueError: 4991 continue 4992 self.assertEqual(multiprocessing.get_start_method(), method) 4993 ctx = multiprocessing.get_context() 4994 self.assertEqual(ctx.get_start_method(), method) 4995 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 4996 self.assertTrue( 4997 ctx.Process.__name__.lower().startswith(method)) 4998 self.check_context(multiprocessing) 4999 count += 1 5000 finally: 5001 multiprocessing.set_start_method(old_method, force=True) 5002 self.assertGreaterEqual(count, 1) 5003 5004 def test_get_all(self): 5005 methods = multiprocessing.get_all_start_methods() 5006 if sys.platform == 'win32': 5007 self.assertEqual(methods, ['spawn']) 5008 else: 5009 self.assertTrue(methods == ['fork', 'spawn'] or 5010 methods == ['spawn', 'fork'] or 5011 methods == ['fork', 'spawn', 'forkserver'] or 5012 methods == ['spawn', 'fork', 'forkserver']) 5013 5014 def test_preload_resources(self): 5015 if multiprocessing.get_start_method() != 'forkserver': 5016 self.skipTest("test only relevant for 'forkserver' method") 5017 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 5018 rc, out, err = test.support.script_helper.assert_python_ok(name) 5019 out = out.decode() 5020 err = err.decode() 5021 if out.rstrip() != 'ok' or err != '': 5022 print(out) 5023 print(err) 5024 self.fail("failed spawning forkserver or grandchild") 5025 5026 5027@unittest.skipIf(sys.platform == "win32", 5028 "test semantics don't make sense on Windows") 5029class TestResourceTracker(unittest.TestCase): 5030 5031 def test_resource_tracker(self): 5032 # 5033 # Check that killing process does not leak named semaphores 5034 # 5035 cmd = '''if 1: 5036 import time, os, tempfile 5037 import multiprocessing as mp 5038 from multiprocessing import resource_tracker 5039 from multiprocessing.shared_memory import SharedMemory 5040 5041 mp.set_start_method("spawn") 5042 rand = tempfile._RandomNameSequence() 5043 5044 5045 def create_and_register_resource(rtype): 5046 if rtype == "semaphore": 5047 lock = mp.Lock() 5048 return lock, lock._semlock.name 5049 elif rtype == "shared_memory": 5050 sm = SharedMemory(create=True, size=10) 5051 return sm, sm._name 5052 else: 5053 raise ValueError( 5054 "Resource type {{}} not understood".format(rtype)) 5055 5056 5057 resource1, rname1 = create_and_register_resource("{rtype}") 5058 resource2, rname2 = create_and_register_resource("{rtype}") 5059 5060 os.write({w}, rname1.encode("ascii") + b"\\n") 5061 os.write({w}, rname2.encode("ascii") + b"\\n") 5062 5063 time.sleep(10) 5064 ''' 5065 for rtype in resource_tracker._CLEANUP_FUNCS: 5066 with self.subTest(rtype=rtype): 5067 if rtype == "noop": 5068 # Artefact resource type used by the resource_tracker 5069 continue 5070 r, w = os.pipe() 5071 p = subprocess.Popen([sys.executable, 5072 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5073 pass_fds=[w], 5074 stderr=subprocess.PIPE) 5075 os.close(w) 5076 with open(r, 'rb', closefd=True) as f: 5077 name1 = f.readline().rstrip().decode('ascii') 5078 name2 = f.readline().rstrip().decode('ascii') 5079 _resource_unlink(name1, rtype) 5080 p.terminate() 5081 p.wait() 5082 5083 deadline = time.monotonic() + 60 5084 while time.monotonic() < deadline: 5085 time.sleep(.5) 5086 try: 5087 _resource_unlink(name2, rtype) 5088 except OSError as e: 5089 # docs say it should be ENOENT, but OSX seems to give 5090 # EINVAL 5091 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5092 break 5093 else: 5094 raise AssertionError( 5095 f"A {rtype} resource was leaked after a process was " 5096 f"abruptly terminated.") 5097 err = p.stderr.read().decode('utf-8') 5098 p.stderr.close() 5099 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5100 'objects'.format( 5101 rtype)) 5102 self.assertRegex(err, expected) 5103 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5104 5105 def check_resource_tracker_death(self, signum, should_die): 5106 # bpo-31310: if the semaphore tracker process has died, it should 5107 # be restarted implicitly. 5108 from multiprocessing.resource_tracker import _resource_tracker 5109 pid = _resource_tracker._pid 5110 if pid is not None: 5111 os.kill(pid, signal.SIGKILL) 5112 os.waitpid(pid, 0) 5113 with warnings.catch_warnings(): 5114 warnings.simplefilter("ignore") 5115 _resource_tracker.ensure_running() 5116 pid = _resource_tracker._pid 5117 5118 os.kill(pid, signum) 5119 time.sleep(1.0) # give it time to die 5120 5121 ctx = multiprocessing.get_context("spawn") 5122 with warnings.catch_warnings(record=True) as all_warn: 5123 warnings.simplefilter("always") 5124 sem = ctx.Semaphore() 5125 sem.acquire() 5126 sem.release() 5127 wr = weakref.ref(sem) 5128 # ensure `sem` gets collected, which triggers communication with 5129 # the semaphore tracker 5130 del sem 5131 gc.collect() 5132 self.assertIsNone(wr()) 5133 if should_die: 5134 self.assertEqual(len(all_warn), 1) 5135 the_warn = all_warn[0] 5136 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5137 self.assertTrue("resource_tracker: process died" 5138 in str(the_warn.message)) 5139 else: 5140 self.assertEqual(len(all_warn), 0) 5141 5142 def test_resource_tracker_sigint(self): 5143 # Catchable signal (ignored by semaphore tracker) 5144 self.check_resource_tracker_death(signal.SIGINT, False) 5145 5146 def test_resource_tracker_sigterm(self): 5147 # Catchable signal (ignored by semaphore tracker) 5148 self.check_resource_tracker_death(signal.SIGTERM, False) 5149 5150 def test_resource_tracker_sigkill(self): 5151 # Uncatchable signal. 5152 self.check_resource_tracker_death(signal.SIGKILL, True) 5153 5154 @staticmethod 5155 def _is_resource_tracker_reused(conn, pid): 5156 from multiprocessing.resource_tracker import _resource_tracker 5157 _resource_tracker.ensure_running() 5158 # The pid should be None in the child process, expect for the fork 5159 # context. It should not be a new value. 5160 reused = _resource_tracker._pid in (None, pid) 5161 reused &= _resource_tracker._check_alive() 5162 conn.send(reused) 5163 5164 def test_resource_tracker_reused(self): 5165 from multiprocessing.resource_tracker import _resource_tracker 5166 _resource_tracker.ensure_running() 5167 pid = _resource_tracker._pid 5168 5169 r, w = multiprocessing.Pipe(duplex=False) 5170 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5171 args=(w, pid)) 5172 p.start() 5173 is_resource_tracker_reused = r.recv() 5174 5175 # Clean up 5176 p.join() 5177 w.close() 5178 r.close() 5179 5180 self.assertTrue(is_resource_tracker_reused) 5181 5182 5183class TestSimpleQueue(unittest.TestCase): 5184 5185 @classmethod 5186 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5187 child_can_start.wait() 5188 # issue 30301, could fail under spawn and forkserver 5189 try: 5190 queue.put(queue.empty()) 5191 queue.put(queue.empty()) 5192 finally: 5193 parent_can_continue.set() 5194 5195 def test_empty(self): 5196 queue = multiprocessing.SimpleQueue() 5197 child_can_start = multiprocessing.Event() 5198 parent_can_continue = multiprocessing.Event() 5199 5200 proc = multiprocessing.Process( 5201 target=self._test_empty, 5202 args=(queue, child_can_start, parent_can_continue) 5203 ) 5204 proc.daemon = True 5205 proc.start() 5206 5207 self.assertTrue(queue.empty()) 5208 5209 child_can_start.set() 5210 parent_can_continue.wait() 5211 5212 self.assertFalse(queue.empty()) 5213 self.assertEqual(queue.get(), True) 5214 self.assertEqual(queue.get(), False) 5215 self.assertTrue(queue.empty()) 5216 5217 proc.join() 5218 5219 5220class TestPoolNotLeakOnFailure(unittest.TestCase): 5221 5222 def test_release_unused_processes(self): 5223 # Issue #19675: During pool creation, if we can't create a process, 5224 # don't leak already created ones. 5225 will_fail_in = 3 5226 forked_processes = [] 5227 5228 class FailingForkProcess: 5229 def __init__(self, **kwargs): 5230 self.name = 'Fake Process' 5231 self.exitcode = None 5232 self.state = None 5233 forked_processes.append(self) 5234 5235 def start(self): 5236 nonlocal will_fail_in 5237 if will_fail_in <= 0: 5238 raise OSError("Manually induced OSError") 5239 will_fail_in -= 1 5240 self.state = 'started' 5241 5242 def terminate(self): 5243 self.state = 'stopping' 5244 5245 def join(self): 5246 if self.state == 'stopping': 5247 self.state = 'stopped' 5248 5249 def is_alive(self): 5250 return self.state == 'started' or self.state == 'stopping' 5251 5252 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 5253 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 5254 Process=FailingForkProcess)) 5255 p.close() 5256 p.join() 5257 self.assertFalse( 5258 any(process.is_alive() for process in forked_processes)) 5259 5260 5261class TestSyncManagerTypes(unittest.TestCase): 5262 """Test all the types which can be shared between a parent and a 5263 child process by using a manager which acts as an intermediary 5264 between them. 5265 5266 In the following unit-tests the base type is created in the parent 5267 process, the @classmethod represents the worker process and the 5268 shared object is readable and editable between the two. 5269 5270 # The child. 5271 @classmethod 5272 def _test_list(cls, obj): 5273 assert obj[0] == 5 5274 assert obj.append(6) 5275 5276 # The parent. 5277 def test_list(self): 5278 o = self.manager.list() 5279 o.append(5) 5280 self.run_worker(self._test_list, o) 5281 assert o[1] == 6 5282 """ 5283 manager_class = multiprocessing.managers.SyncManager 5284 5285 def setUp(self): 5286 self.manager = self.manager_class() 5287 self.manager.start() 5288 self.proc = None 5289 5290 def tearDown(self): 5291 if self.proc is not None and self.proc.is_alive(): 5292 self.proc.terminate() 5293 self.proc.join() 5294 self.manager.shutdown() 5295 self.manager = None 5296 self.proc = None 5297 5298 @classmethod 5299 def setUpClass(cls): 5300 support.reap_children() 5301 5302 tearDownClass = setUpClass 5303 5304 def wait_proc_exit(self): 5305 # Only the manager process should be returned by active_children() 5306 # but this can take a bit on slow machines, so wait a few seconds 5307 # if there are other children too (see #17395). 5308 join_process(self.proc) 5309 start_time = time.monotonic() 5310 t = 0.01 5311 while len(multiprocessing.active_children()) > 1: 5312 time.sleep(t) 5313 t *= 2 5314 dt = time.monotonic() - start_time 5315 if dt >= 5.0: 5316 test.support.environment_altered = True 5317 support.print_warning(f"multiprocessing.Manager still has " 5318 f"{multiprocessing.active_children()} " 5319 f"active children after {dt} seconds") 5320 break 5321 5322 def run_worker(self, worker, obj): 5323 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 5324 self.proc.daemon = True 5325 self.proc.start() 5326 self.wait_proc_exit() 5327 self.assertEqual(self.proc.exitcode, 0) 5328 5329 @classmethod 5330 def _test_event(cls, obj): 5331 assert obj.is_set() 5332 obj.wait() 5333 obj.clear() 5334 obj.wait(0.001) 5335 5336 def test_event(self): 5337 o = self.manager.Event() 5338 o.set() 5339 self.run_worker(self._test_event, o) 5340 assert not o.is_set() 5341 o.wait(0.001) 5342 5343 @classmethod 5344 def _test_lock(cls, obj): 5345 obj.acquire() 5346 5347 def test_lock(self, lname="Lock"): 5348 o = getattr(self.manager, lname)() 5349 self.run_worker(self._test_lock, o) 5350 o.release() 5351 self.assertRaises(RuntimeError, o.release) # already released 5352 5353 @classmethod 5354 def _test_rlock(cls, obj): 5355 obj.acquire() 5356 obj.release() 5357 5358 def test_rlock(self, lname="Lock"): 5359 o = getattr(self.manager, lname)() 5360 self.run_worker(self._test_rlock, o) 5361 5362 @classmethod 5363 def _test_semaphore(cls, obj): 5364 obj.acquire() 5365 5366 def test_semaphore(self, sname="Semaphore"): 5367 o = getattr(self.manager, sname)() 5368 self.run_worker(self._test_semaphore, o) 5369 o.release() 5370 5371 def test_bounded_semaphore(self): 5372 self.test_semaphore(sname="BoundedSemaphore") 5373 5374 @classmethod 5375 def _test_condition(cls, obj): 5376 obj.acquire() 5377 obj.release() 5378 5379 def test_condition(self): 5380 o = self.manager.Condition() 5381 self.run_worker(self._test_condition, o) 5382 5383 @classmethod 5384 def _test_barrier(cls, obj): 5385 assert obj.parties == 5 5386 obj.reset() 5387 5388 def test_barrier(self): 5389 o = self.manager.Barrier(5) 5390 self.run_worker(self._test_barrier, o) 5391 5392 @classmethod 5393 def _test_pool(cls, obj): 5394 # TODO: fix https://bugs.python.org/issue35919 5395 with obj: 5396 pass 5397 5398 def test_pool(self): 5399 o = self.manager.Pool(processes=4) 5400 self.run_worker(self._test_pool, o) 5401 5402 @classmethod 5403 def _test_queue(cls, obj): 5404 assert obj.qsize() == 2 5405 assert obj.full() 5406 assert not obj.empty() 5407 assert obj.get() == 5 5408 assert not obj.empty() 5409 assert obj.get() == 6 5410 assert obj.empty() 5411 5412 def test_queue(self, qname="Queue"): 5413 o = getattr(self.manager, qname)(2) 5414 o.put(5) 5415 o.put(6) 5416 self.run_worker(self._test_queue, o) 5417 assert o.empty() 5418 assert not o.full() 5419 5420 def test_joinable_queue(self): 5421 self.test_queue("JoinableQueue") 5422 5423 @classmethod 5424 def _test_list(cls, obj): 5425 assert obj[0] == 5 5426 assert obj.count(5) == 1 5427 assert obj.index(5) == 0 5428 obj.sort() 5429 obj.reverse() 5430 for x in obj: 5431 pass 5432 assert len(obj) == 1 5433 assert obj.pop(0) == 5 5434 5435 def test_list(self): 5436 o = self.manager.list() 5437 o.append(5) 5438 self.run_worker(self._test_list, o) 5439 assert not o 5440 self.assertEqual(len(o), 0) 5441 5442 @classmethod 5443 def _test_dict(cls, obj): 5444 assert len(obj) == 1 5445 assert obj['foo'] == 5 5446 assert obj.get('foo') == 5 5447 assert list(obj.items()) == [('foo', 5)] 5448 assert list(obj.keys()) == ['foo'] 5449 assert list(obj.values()) == [5] 5450 assert obj.copy() == {'foo': 5} 5451 assert obj.popitem() == ('foo', 5) 5452 5453 def test_dict(self): 5454 o = self.manager.dict() 5455 o['foo'] = 5 5456 self.run_worker(self._test_dict, o) 5457 assert not o 5458 self.assertEqual(len(o), 0) 5459 5460 @classmethod 5461 def _test_value(cls, obj): 5462 assert obj.value == 1 5463 assert obj.get() == 1 5464 obj.set(2) 5465 5466 def test_value(self): 5467 o = self.manager.Value('i', 1) 5468 self.run_worker(self._test_value, o) 5469 self.assertEqual(o.value, 2) 5470 self.assertEqual(o.get(), 2) 5471 5472 @classmethod 5473 def _test_array(cls, obj): 5474 assert obj[0] == 0 5475 assert obj[1] == 1 5476 assert len(obj) == 2 5477 assert list(obj) == [0, 1] 5478 5479 def test_array(self): 5480 o = self.manager.Array('i', [0, 1]) 5481 self.run_worker(self._test_array, o) 5482 5483 @classmethod 5484 def _test_namespace(cls, obj): 5485 assert obj.x == 0 5486 assert obj.y == 1 5487 5488 def test_namespace(self): 5489 o = self.manager.Namespace() 5490 o.x = 0 5491 o.y = 1 5492 self.run_worker(self._test_namespace, o) 5493 5494 5495class MiscTestCase(unittest.TestCase): 5496 def test__all__(self): 5497 # Just make sure names in blacklist are excluded 5498 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 5499 blacklist=['SUBDEBUG', 'SUBWARNING']) 5500# 5501# Mixins 5502# 5503 5504class BaseMixin(object): 5505 @classmethod 5506 def setUpClass(cls): 5507 cls.dangling = (multiprocessing.process._dangling.copy(), 5508 threading._dangling.copy()) 5509 5510 @classmethod 5511 def tearDownClass(cls): 5512 # bpo-26762: Some multiprocessing objects like Pool create reference 5513 # cycles. Trigger a garbage collection to break these cycles. 5514 test.support.gc_collect() 5515 5516 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 5517 if processes: 5518 test.support.environment_altered = True 5519 support.print_warning(f'Dangling processes: {processes}') 5520 processes = None 5521 5522 threads = set(threading._dangling) - set(cls.dangling[1]) 5523 if threads: 5524 test.support.environment_altered = True 5525 support.print_warning(f'Dangling threads: {threads}') 5526 threads = None 5527 5528 5529class ProcessesMixin(BaseMixin): 5530 TYPE = 'processes' 5531 Process = multiprocessing.Process 5532 connection = multiprocessing.connection 5533 current_process = staticmethod(multiprocessing.current_process) 5534 parent_process = staticmethod(multiprocessing.parent_process) 5535 active_children = staticmethod(multiprocessing.active_children) 5536 Pool = staticmethod(multiprocessing.Pool) 5537 Pipe = staticmethod(multiprocessing.Pipe) 5538 Queue = staticmethod(multiprocessing.Queue) 5539 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 5540 Lock = staticmethod(multiprocessing.Lock) 5541 RLock = staticmethod(multiprocessing.RLock) 5542 Semaphore = staticmethod(multiprocessing.Semaphore) 5543 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 5544 Condition = staticmethod(multiprocessing.Condition) 5545 Event = staticmethod(multiprocessing.Event) 5546 Barrier = staticmethod(multiprocessing.Barrier) 5547 Value = staticmethod(multiprocessing.Value) 5548 Array = staticmethod(multiprocessing.Array) 5549 RawValue = staticmethod(multiprocessing.RawValue) 5550 RawArray = staticmethod(multiprocessing.RawArray) 5551 5552 5553class ManagerMixin(BaseMixin): 5554 TYPE = 'manager' 5555 Process = multiprocessing.Process 5556 Queue = property(operator.attrgetter('manager.Queue')) 5557 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 5558 Lock = property(operator.attrgetter('manager.Lock')) 5559 RLock = property(operator.attrgetter('manager.RLock')) 5560 Semaphore = property(operator.attrgetter('manager.Semaphore')) 5561 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 5562 Condition = property(operator.attrgetter('manager.Condition')) 5563 Event = property(operator.attrgetter('manager.Event')) 5564 Barrier = property(operator.attrgetter('manager.Barrier')) 5565 Value = property(operator.attrgetter('manager.Value')) 5566 Array = property(operator.attrgetter('manager.Array')) 5567 list = property(operator.attrgetter('manager.list')) 5568 dict = property(operator.attrgetter('manager.dict')) 5569 Namespace = property(operator.attrgetter('manager.Namespace')) 5570 5571 @classmethod 5572 def Pool(cls, *args, **kwds): 5573 return cls.manager.Pool(*args, **kwds) 5574 5575 @classmethod 5576 def setUpClass(cls): 5577 super().setUpClass() 5578 cls.manager = multiprocessing.Manager() 5579 5580 @classmethod 5581 def tearDownClass(cls): 5582 # only the manager process should be returned by active_children() 5583 # but this can take a bit on slow machines, so wait a few seconds 5584 # if there are other children too (see #17395) 5585 start_time = time.monotonic() 5586 t = 0.01 5587 while len(multiprocessing.active_children()) > 1: 5588 time.sleep(t) 5589 t *= 2 5590 dt = time.monotonic() - start_time 5591 if dt >= 5.0: 5592 test.support.environment_altered = True 5593 support.print_warning(f"multiprocessing.Manager still has " 5594 f"{multiprocessing.active_children()} " 5595 f"active children after {dt} seconds") 5596 break 5597 5598 gc.collect() # do garbage collection 5599 if cls.manager._number_of_objects() != 0: 5600 # This is not really an error since some tests do not 5601 # ensure that all processes which hold a reference to a 5602 # managed object have been joined. 5603 test.support.environment_altered = True 5604 support.print_warning('Shared objects which still exist ' 5605 'at manager shutdown:') 5606 support.print_warning(cls.manager._debug_info()) 5607 cls.manager.shutdown() 5608 cls.manager.join() 5609 cls.manager = None 5610 5611 super().tearDownClass() 5612 5613 5614class ThreadsMixin(BaseMixin): 5615 TYPE = 'threads' 5616 Process = multiprocessing.dummy.Process 5617 connection = multiprocessing.dummy.connection 5618 current_process = staticmethod(multiprocessing.dummy.current_process) 5619 active_children = staticmethod(multiprocessing.dummy.active_children) 5620 Pool = staticmethod(multiprocessing.dummy.Pool) 5621 Pipe = staticmethod(multiprocessing.dummy.Pipe) 5622 Queue = staticmethod(multiprocessing.dummy.Queue) 5623 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 5624 Lock = staticmethod(multiprocessing.dummy.Lock) 5625 RLock = staticmethod(multiprocessing.dummy.RLock) 5626 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 5627 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 5628 Condition = staticmethod(multiprocessing.dummy.Condition) 5629 Event = staticmethod(multiprocessing.dummy.Event) 5630 Barrier = staticmethod(multiprocessing.dummy.Barrier) 5631 Value = staticmethod(multiprocessing.dummy.Value) 5632 Array = staticmethod(multiprocessing.dummy.Array) 5633 5634# 5635# Functions used to create test cases from the base ones in this module 5636# 5637 5638def install_tests_in_module_dict(remote_globs, start_method): 5639 __module__ = remote_globs['__name__'] 5640 local_globs = globals() 5641 ALL_TYPES = {'processes', 'threads', 'manager'} 5642 5643 for name, base in local_globs.items(): 5644 if not isinstance(base, type): 5645 continue 5646 if issubclass(base, BaseTestCase): 5647 if base is BaseTestCase: 5648 continue 5649 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 5650 for type_ in base.ALLOWED_TYPES: 5651 newname = 'With' + type_.capitalize() + name[1:] 5652 Mixin = local_globs[type_.capitalize() + 'Mixin'] 5653 class Temp(base, Mixin, unittest.TestCase): 5654 pass 5655 Temp.__name__ = Temp.__qualname__ = newname 5656 Temp.__module__ = __module__ 5657 remote_globs[newname] = Temp 5658 elif issubclass(base, unittest.TestCase): 5659 class Temp(base, object): 5660 pass 5661 Temp.__name__ = Temp.__qualname__ = name 5662 Temp.__module__ = __module__ 5663 remote_globs[name] = Temp 5664 5665 dangling = [None, None] 5666 old_start_method = [None] 5667 5668 def setUpModule(): 5669 multiprocessing.set_forkserver_preload(PRELOAD) 5670 multiprocessing.process._cleanup() 5671 dangling[0] = multiprocessing.process._dangling.copy() 5672 dangling[1] = threading._dangling.copy() 5673 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 5674 try: 5675 multiprocessing.set_start_method(start_method, force=True) 5676 except ValueError: 5677 raise unittest.SkipTest(start_method + 5678 ' start method not supported') 5679 5680 if sys.platform.startswith("linux"): 5681 try: 5682 lock = multiprocessing.RLock() 5683 except OSError: 5684 raise unittest.SkipTest("OSError raises on RLock creation, " 5685 "see issue 3111!") 5686 check_enough_semaphores() 5687 util.get_temp_dir() # creates temp directory 5688 multiprocessing.get_logger().setLevel(LOG_LEVEL) 5689 5690 def tearDownModule(): 5691 need_sleep = False 5692 5693 # bpo-26762: Some multiprocessing objects like Pool create reference 5694 # cycles. Trigger a garbage collection to break these cycles. 5695 test.support.gc_collect() 5696 5697 multiprocessing.set_start_method(old_start_method[0], force=True) 5698 # pause a bit so we don't get warning about dangling threads/processes 5699 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 5700 if processes: 5701 need_sleep = True 5702 test.support.environment_altered = True 5703 support.print_warning(f'Dangling processes: {processes}') 5704 processes = None 5705 5706 threads = set(threading._dangling) - set(dangling[1]) 5707 if threads: 5708 need_sleep = True 5709 test.support.environment_altered = True 5710 support.print_warning(f'Dangling threads: {threads}') 5711 threads = None 5712 5713 # Sleep 500 ms to give time to child processes to complete. 5714 if need_sleep: 5715 time.sleep(0.5) 5716 5717 multiprocessing.util._cleanup_tests() 5718 5719 remote_globs['setUpModule'] = setUpModule 5720 remote_globs['tearDownModule'] = tearDownModule 5721