1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import time 9import io 10import itertools 11import sys 12import os 13import gc 14import errno 15import signal 16import array 17import socket 18import random 19import logging 20import subprocess 21import struct 22import operator 23import pickle 24import weakref 25import warnings 26import test.support 27import test.support.script_helper 28from test import support 29from test.support import hashlib_helper 30from test.support import import_helper 31from test.support import os_helper 32from test.support import socket_helper 33from test.support import threading_helper 34from test.support import warnings_helper 35 36 37# Skip tests if _multiprocessing wasn't built. 38_multiprocessing = import_helper.import_module('_multiprocessing') 39# Skip tests if sem_open implementation is broken. 40support.skip_if_broken_multiprocessing_synchronize() 41import threading 42 43import multiprocessing.connection 44import multiprocessing.dummy 45import multiprocessing.heap 46import multiprocessing.managers 47import multiprocessing.pool 48import multiprocessing.queues 49 50from multiprocessing import util 51 52try: 53 from multiprocessing import reduction 54 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 55except ImportError: 56 HAS_REDUCTION = False 57 58try: 59 from multiprocessing.sharedctypes import Value, copy 60 HAS_SHAREDCTYPES = True 61except ImportError: 62 HAS_SHAREDCTYPES = False 63 64try: 65 from multiprocessing import shared_memory 66 HAS_SHMEM = True 67except ImportError: 68 HAS_SHMEM = False 69 70try: 71 import msvcrt 72except ImportError: 73 msvcrt = None 74 75 76if support.check_sanitizer(address=True): 77 # bpo-45200: Skip multiprocessing tests if Python is built with ASAN to 78 # work around a libasan race condition: dead lock in pthread_create(). 79 raise unittest.SkipTest("libasan has a pthread_create() dead lock") 80 81 82def latin(s): 83 return s.encode('latin') 84 85 86def close_queue(queue): 87 if isinstance(queue, multiprocessing.queues.Queue): 88 queue.close() 89 queue.join_thread() 90 91 92def join_process(process): 93 # Since multiprocessing.Process has the same API than threading.Thread 94 # (join() and is_alive(), the support function can be reused 95 threading_helper.join_thread(process) 96 97 98if os.name == "posix": 99 from multiprocessing import resource_tracker 100 101 def _resource_unlink(name, rtype): 102 resource_tracker._CLEANUP_FUNCS[rtype](name) 103 104 105# 106# Constants 107# 108 109LOG_LEVEL = util.SUBWARNING 110#LOG_LEVEL = logging.DEBUG 111 112DELTA = 0.1 113CHECK_TIMINGS = False # making true makes tests take a lot longer 114 # and can sometimes cause some non-serious 115 # failures because some calls block a bit 116 # longer than expected 117if CHECK_TIMINGS: 118 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 119else: 120 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 121 122HAVE_GETVALUE = not getattr(_multiprocessing, 123 'HAVE_BROKEN_SEM_GETVALUE', False) 124 125WIN32 = (sys.platform == "win32") 126 127from multiprocessing.connection import wait 128 129def wait_for_handle(handle, timeout): 130 if timeout is not None and timeout < 0.0: 131 timeout = None 132 return wait([handle], timeout) 133 134try: 135 MAXFD = os.sysconf("SC_OPEN_MAX") 136except: 137 MAXFD = 256 138 139# To speed up tests when using the forkserver, we can preload these: 140PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 141 142# 143# Some tests require ctypes 144# 145 146try: 147 from ctypes import Structure, c_int, c_double, c_longlong 148except ImportError: 149 Structure = object 150 c_int = c_double = c_longlong = None 151 152 153def check_enough_semaphores(): 154 """Check that the system supports enough semaphores to run the test.""" 155 # minimum number of semaphores available according to POSIX 156 nsems_min = 256 157 try: 158 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 159 except (AttributeError, ValueError): 160 # sysconf not available or setting not available 161 return 162 if nsems == -1 or nsems >= nsems_min: 163 return 164 raise unittest.SkipTest("The OS doesn't support enough semaphores " 165 "to run the test (required: %d)." % nsems_min) 166 167 168# 169# Creates a wrapper for a function which records the time it takes to finish 170# 171 172class TimingWrapper(object): 173 174 def __init__(self, func): 175 self.func = func 176 self.elapsed = None 177 178 def __call__(self, *args, **kwds): 179 t = time.monotonic() 180 try: 181 return self.func(*args, **kwds) 182 finally: 183 self.elapsed = time.monotonic() - t 184 185# 186# Base class for test cases 187# 188 189class BaseTestCase(object): 190 191 ALLOWED_TYPES = ('processes', 'manager', 'threads') 192 193 def assertTimingAlmostEqual(self, a, b): 194 if CHECK_TIMINGS: 195 self.assertAlmostEqual(a, b, 1) 196 197 def assertReturnsIfImplemented(self, value, func, *args): 198 try: 199 res = func(*args) 200 except NotImplementedError: 201 pass 202 else: 203 return self.assertEqual(value, res) 204 205 # For the sanity of Windows users, rather than crashing or freezing in 206 # multiple ways. 207 def __reduce__(self, *args): 208 raise NotImplementedError("shouldn't try to pickle a test case") 209 210 __reduce_ex__ = __reduce__ 211 212# 213# Return the value of a semaphore 214# 215 216def get_value(self): 217 try: 218 return self.get_value() 219 except AttributeError: 220 try: 221 return self._Semaphore__value 222 except AttributeError: 223 try: 224 return self._value 225 except AttributeError: 226 raise NotImplementedError 227 228# 229# Testcases 230# 231 232class DummyCallable: 233 def __call__(self, q, c): 234 assert isinstance(c, DummyCallable) 235 q.put(5) 236 237 238class _TestProcess(BaseTestCase): 239 240 ALLOWED_TYPES = ('processes', 'threads') 241 242 def test_current(self): 243 if self.TYPE == 'threads': 244 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 245 246 current = self.current_process() 247 authkey = current.authkey 248 249 self.assertTrue(current.is_alive()) 250 self.assertTrue(not current.daemon) 251 self.assertIsInstance(authkey, bytes) 252 self.assertTrue(len(authkey) > 0) 253 self.assertEqual(current.ident, os.getpid()) 254 self.assertEqual(current.exitcode, None) 255 256 def test_daemon_argument(self): 257 if self.TYPE == "threads": 258 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 259 260 # By default uses the current process's daemon flag. 261 proc0 = self.Process(target=self._test) 262 self.assertEqual(proc0.daemon, self.current_process().daemon) 263 proc1 = self.Process(target=self._test, daemon=True) 264 self.assertTrue(proc1.daemon) 265 proc2 = self.Process(target=self._test, daemon=False) 266 self.assertFalse(proc2.daemon) 267 268 @classmethod 269 def _test(cls, q, *args, **kwds): 270 current = cls.current_process() 271 q.put(args) 272 q.put(kwds) 273 q.put(current.name) 274 if cls.TYPE != 'threads': 275 q.put(bytes(current.authkey)) 276 q.put(current.pid) 277 278 def test_parent_process_attributes(self): 279 if self.TYPE == "threads": 280 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 281 282 self.assertIsNone(self.parent_process()) 283 284 rconn, wconn = self.Pipe(duplex=False) 285 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 286 p.start() 287 p.join() 288 parent_pid, parent_name = rconn.recv() 289 self.assertEqual(parent_pid, self.current_process().pid) 290 self.assertEqual(parent_pid, os.getpid()) 291 self.assertEqual(parent_name, self.current_process().name) 292 293 @classmethod 294 def _test_send_parent_process(cls, wconn): 295 from multiprocessing.process import parent_process 296 wconn.send([parent_process().pid, parent_process().name]) 297 298 def test_parent_process(self): 299 if self.TYPE == "threads": 300 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 301 302 # Launch a child process. Make it launch a grandchild process. Kill the 303 # child process and make sure that the grandchild notices the death of 304 # its parent (a.k.a the child process). 305 rconn, wconn = self.Pipe(duplex=False) 306 p = self.Process( 307 target=self._test_create_grandchild_process, args=(wconn, )) 308 p.start() 309 310 if not rconn.poll(timeout=support.LONG_TIMEOUT): 311 raise AssertionError("Could not communicate with child process") 312 parent_process_status = rconn.recv() 313 self.assertEqual(parent_process_status, "alive") 314 315 p.terminate() 316 p.join() 317 318 if not rconn.poll(timeout=support.LONG_TIMEOUT): 319 raise AssertionError("Could not communicate with child process") 320 parent_process_status = rconn.recv() 321 self.assertEqual(parent_process_status, "not alive") 322 323 @classmethod 324 def _test_create_grandchild_process(cls, wconn): 325 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 326 p.start() 327 time.sleep(300) 328 329 @classmethod 330 def _test_report_parent_status(cls, wconn): 331 from multiprocessing.process import parent_process 332 wconn.send("alive" if parent_process().is_alive() else "not alive") 333 parent_process().join(timeout=support.SHORT_TIMEOUT) 334 wconn.send("alive" if parent_process().is_alive() else "not alive") 335 336 def test_process(self): 337 q = self.Queue(1) 338 e = self.Event() 339 args = (q, 1, 2) 340 kwargs = {'hello':23, 'bye':2.54} 341 name = 'SomeProcess' 342 p = self.Process( 343 target=self._test, args=args, kwargs=kwargs, name=name 344 ) 345 p.daemon = True 346 current = self.current_process() 347 348 if self.TYPE != 'threads': 349 self.assertEqual(p.authkey, current.authkey) 350 self.assertEqual(p.is_alive(), False) 351 self.assertEqual(p.daemon, True) 352 self.assertNotIn(p, self.active_children()) 353 self.assertTrue(type(self.active_children()) is list) 354 self.assertEqual(p.exitcode, None) 355 356 p.start() 357 358 self.assertEqual(p.exitcode, None) 359 self.assertEqual(p.is_alive(), True) 360 self.assertIn(p, self.active_children()) 361 362 self.assertEqual(q.get(), args[1:]) 363 self.assertEqual(q.get(), kwargs) 364 self.assertEqual(q.get(), p.name) 365 if self.TYPE != 'threads': 366 self.assertEqual(q.get(), current.authkey) 367 self.assertEqual(q.get(), p.pid) 368 369 p.join() 370 371 self.assertEqual(p.exitcode, 0) 372 self.assertEqual(p.is_alive(), False) 373 self.assertNotIn(p, self.active_children()) 374 close_queue(q) 375 376 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 377 def test_process_mainthread_native_id(self): 378 if self.TYPE == 'threads': 379 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 380 381 current_mainthread_native_id = threading.main_thread().native_id 382 383 q = self.Queue(1) 384 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 385 p.start() 386 387 child_mainthread_native_id = q.get() 388 p.join() 389 close_queue(q) 390 391 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 392 393 @classmethod 394 def _test_process_mainthread_native_id(cls, q): 395 mainthread_native_id = threading.main_thread().native_id 396 q.put(mainthread_native_id) 397 398 @classmethod 399 def _sleep_some(cls): 400 time.sleep(100) 401 402 @classmethod 403 def _test_sleep(cls, delay): 404 time.sleep(delay) 405 406 def _kill_process(self, meth): 407 if self.TYPE == 'threads': 408 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 409 410 p = self.Process(target=self._sleep_some) 411 p.daemon = True 412 p.start() 413 414 self.assertEqual(p.is_alive(), True) 415 self.assertIn(p, self.active_children()) 416 self.assertEqual(p.exitcode, None) 417 418 join = TimingWrapper(p.join) 419 420 self.assertEqual(join(0), None) 421 self.assertTimingAlmostEqual(join.elapsed, 0.0) 422 self.assertEqual(p.is_alive(), True) 423 424 self.assertEqual(join(-1), None) 425 self.assertTimingAlmostEqual(join.elapsed, 0.0) 426 self.assertEqual(p.is_alive(), True) 427 428 # XXX maybe terminating too soon causes the problems on Gentoo... 429 time.sleep(1) 430 431 meth(p) 432 433 if hasattr(signal, 'alarm'): 434 # On the Gentoo buildbot waitpid() often seems to block forever. 435 # We use alarm() to interrupt it if it blocks for too long. 436 def handler(*args): 437 raise RuntimeError('join took too long: %s' % p) 438 old_handler = signal.signal(signal.SIGALRM, handler) 439 try: 440 signal.alarm(10) 441 self.assertEqual(join(), None) 442 finally: 443 signal.alarm(0) 444 signal.signal(signal.SIGALRM, old_handler) 445 else: 446 self.assertEqual(join(), None) 447 448 self.assertTimingAlmostEqual(join.elapsed, 0.0) 449 450 self.assertEqual(p.is_alive(), False) 451 self.assertNotIn(p, self.active_children()) 452 453 p.join() 454 455 return p.exitcode 456 457 def test_terminate(self): 458 exitcode = self._kill_process(multiprocessing.Process.terminate) 459 if os.name != 'nt': 460 self.assertEqual(exitcode, -signal.SIGTERM) 461 462 def test_kill(self): 463 exitcode = self._kill_process(multiprocessing.Process.kill) 464 if os.name != 'nt': 465 self.assertEqual(exitcode, -signal.SIGKILL) 466 467 def test_cpu_count(self): 468 try: 469 cpus = multiprocessing.cpu_count() 470 except NotImplementedError: 471 cpus = 1 472 self.assertTrue(type(cpus) is int) 473 self.assertTrue(cpus >= 1) 474 475 def test_active_children(self): 476 self.assertEqual(type(self.active_children()), list) 477 478 p = self.Process(target=time.sleep, args=(DELTA,)) 479 self.assertNotIn(p, self.active_children()) 480 481 p.daemon = True 482 p.start() 483 self.assertIn(p, self.active_children()) 484 485 p.join() 486 self.assertNotIn(p, self.active_children()) 487 488 @classmethod 489 def _test_recursion(cls, wconn, id): 490 wconn.send(id) 491 if len(id) < 2: 492 for i in range(2): 493 p = cls.Process( 494 target=cls._test_recursion, args=(wconn, id+[i]) 495 ) 496 p.start() 497 p.join() 498 499 def test_recursion(self): 500 rconn, wconn = self.Pipe(duplex=False) 501 self._test_recursion(wconn, []) 502 503 time.sleep(DELTA) 504 result = [] 505 while rconn.poll(): 506 result.append(rconn.recv()) 507 508 expected = [ 509 [], 510 [0], 511 [0, 0], 512 [0, 1], 513 [1], 514 [1, 0], 515 [1, 1] 516 ] 517 self.assertEqual(result, expected) 518 519 @classmethod 520 def _test_sentinel(cls, event): 521 event.wait(10.0) 522 523 def test_sentinel(self): 524 if self.TYPE == "threads": 525 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 526 event = self.Event() 527 p = self.Process(target=self._test_sentinel, args=(event,)) 528 with self.assertRaises(ValueError): 529 p.sentinel 530 p.start() 531 self.addCleanup(p.join) 532 sentinel = p.sentinel 533 self.assertIsInstance(sentinel, int) 534 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 535 event.set() 536 p.join() 537 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 538 539 @classmethod 540 def _test_close(cls, rc=0, q=None): 541 if q is not None: 542 q.get() 543 sys.exit(rc) 544 545 def test_close(self): 546 if self.TYPE == "threads": 547 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 548 q = self.Queue() 549 p = self.Process(target=self._test_close, kwargs={'q': q}) 550 p.daemon = True 551 p.start() 552 self.assertEqual(p.is_alive(), True) 553 # Child is still alive, cannot close 554 with self.assertRaises(ValueError): 555 p.close() 556 557 q.put(None) 558 p.join() 559 self.assertEqual(p.is_alive(), False) 560 self.assertEqual(p.exitcode, 0) 561 p.close() 562 with self.assertRaises(ValueError): 563 p.is_alive() 564 with self.assertRaises(ValueError): 565 p.join() 566 with self.assertRaises(ValueError): 567 p.terminate() 568 p.close() 569 570 wr = weakref.ref(p) 571 del p 572 gc.collect() 573 self.assertIs(wr(), None) 574 575 close_queue(q) 576 577 def test_many_processes(self): 578 if self.TYPE == 'threads': 579 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 580 581 sm = multiprocessing.get_start_method() 582 N = 5 if sm == 'spawn' else 100 583 584 # Try to overwhelm the forkserver loop with events 585 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 586 for i in range(N)] 587 for p in procs: 588 p.start() 589 for p in procs: 590 join_process(p) 591 for p in procs: 592 self.assertEqual(p.exitcode, 0) 593 594 procs = [self.Process(target=self._sleep_some) 595 for i in range(N)] 596 for p in procs: 597 p.start() 598 time.sleep(0.001) # let the children start... 599 for p in procs: 600 p.terminate() 601 for p in procs: 602 join_process(p) 603 if os.name != 'nt': 604 exitcodes = [-signal.SIGTERM] 605 if sys.platform == 'darwin': 606 # bpo-31510: On macOS, killing a freshly started process with 607 # SIGTERM sometimes kills the process with SIGKILL. 608 exitcodes.append(-signal.SIGKILL) 609 for p in procs: 610 self.assertIn(p.exitcode, exitcodes) 611 612 def test_lose_target_ref(self): 613 c = DummyCallable() 614 wr = weakref.ref(c) 615 q = self.Queue() 616 p = self.Process(target=c, args=(q, c)) 617 del c 618 p.start() 619 p.join() 620 gc.collect() # For PyPy or other GCs. 621 self.assertIs(wr(), None) 622 self.assertEqual(q.get(), 5) 623 close_queue(q) 624 625 @classmethod 626 def _test_child_fd_inflation(self, evt, q): 627 q.put(os_helper.fd_count()) 628 evt.wait() 629 630 def test_child_fd_inflation(self): 631 # Number of fds in child processes should not grow with the 632 # number of running children. 633 if self.TYPE == 'threads': 634 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 635 636 sm = multiprocessing.get_start_method() 637 if sm == 'fork': 638 # The fork method by design inherits all fds from the parent, 639 # trying to go against it is a lost battle 640 self.skipTest('test not appropriate for {}'.format(sm)) 641 642 N = 5 643 evt = self.Event() 644 q = self.Queue() 645 646 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 647 for i in range(N)] 648 for p in procs: 649 p.start() 650 651 try: 652 fd_counts = [q.get() for i in range(N)] 653 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 654 655 finally: 656 evt.set() 657 for p in procs: 658 p.join() 659 close_queue(q) 660 661 @classmethod 662 def _test_wait_for_threads(self, evt): 663 def func1(): 664 time.sleep(0.5) 665 evt.set() 666 667 def func2(): 668 time.sleep(20) 669 evt.clear() 670 671 threading.Thread(target=func1).start() 672 threading.Thread(target=func2, daemon=True).start() 673 674 def test_wait_for_threads(self): 675 # A child process should wait for non-daemonic threads to end 676 # before exiting 677 if self.TYPE == 'threads': 678 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 679 680 evt = self.Event() 681 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 682 proc.start() 683 proc.join() 684 self.assertTrue(evt.is_set()) 685 686 @classmethod 687 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 688 for stream_name, action in break_std_streams.items(): 689 if action == 'close': 690 stream = io.StringIO() 691 stream.close() 692 else: 693 assert action == 'remove' 694 stream = None 695 setattr(sys, stream_name, None) 696 evt.set() 697 698 def test_error_on_stdio_flush_1(self): 699 # Check that Process works with broken standard streams 700 streams = [io.StringIO(), None] 701 streams[0].close() 702 for stream_name in ('stdout', 'stderr'): 703 for stream in streams: 704 old_stream = getattr(sys, stream_name) 705 setattr(sys, stream_name, stream) 706 try: 707 evt = self.Event() 708 proc = self.Process(target=self._test_error_on_stdio_flush, 709 args=(evt,)) 710 proc.start() 711 proc.join() 712 self.assertTrue(evt.is_set()) 713 self.assertEqual(proc.exitcode, 0) 714 finally: 715 setattr(sys, stream_name, old_stream) 716 717 def test_error_on_stdio_flush_2(self): 718 # Same as test_error_on_stdio_flush_1(), but standard streams are 719 # broken by the child process 720 for stream_name in ('stdout', 'stderr'): 721 for action in ('close', 'remove'): 722 old_stream = getattr(sys, stream_name) 723 try: 724 evt = self.Event() 725 proc = self.Process(target=self._test_error_on_stdio_flush, 726 args=(evt, {stream_name: action})) 727 proc.start() 728 proc.join() 729 self.assertTrue(evt.is_set()) 730 self.assertEqual(proc.exitcode, 0) 731 finally: 732 setattr(sys, stream_name, old_stream) 733 734 @classmethod 735 def _sleep_and_set_event(self, evt, delay=0.0): 736 time.sleep(delay) 737 evt.set() 738 739 def check_forkserver_death(self, signum): 740 # bpo-31308: if the forkserver process has died, we should still 741 # be able to create and run new Process instances (the forkserver 742 # is implicitly restarted). 743 if self.TYPE == 'threads': 744 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 745 sm = multiprocessing.get_start_method() 746 if sm != 'forkserver': 747 # The fork method by design inherits all fds from the parent, 748 # trying to go against it is a lost battle 749 self.skipTest('test not appropriate for {}'.format(sm)) 750 751 from multiprocessing.forkserver import _forkserver 752 _forkserver.ensure_running() 753 754 # First process sleeps 500 ms 755 delay = 0.5 756 757 evt = self.Event() 758 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 759 proc.start() 760 761 pid = _forkserver._forkserver_pid 762 os.kill(pid, signum) 763 # give time to the fork server to die and time to proc to complete 764 time.sleep(delay * 2.0) 765 766 evt2 = self.Event() 767 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 768 proc2.start() 769 proc2.join() 770 self.assertTrue(evt2.is_set()) 771 self.assertEqual(proc2.exitcode, 0) 772 773 proc.join() 774 self.assertTrue(evt.is_set()) 775 self.assertIn(proc.exitcode, (0, 255)) 776 777 def test_forkserver_sigint(self): 778 # Catchable signal 779 self.check_forkserver_death(signal.SIGINT) 780 781 def test_forkserver_sigkill(self): 782 # Uncatchable signal 783 if os.name != 'nt': 784 self.check_forkserver_death(signal.SIGKILL) 785 786 787# 788# 789# 790 791class _UpperCaser(multiprocessing.Process): 792 793 def __init__(self): 794 multiprocessing.Process.__init__(self) 795 self.child_conn, self.parent_conn = multiprocessing.Pipe() 796 797 def run(self): 798 self.parent_conn.close() 799 for s in iter(self.child_conn.recv, None): 800 self.child_conn.send(s.upper()) 801 self.child_conn.close() 802 803 def submit(self, s): 804 assert type(s) is str 805 self.parent_conn.send(s) 806 return self.parent_conn.recv() 807 808 def stop(self): 809 self.parent_conn.send(None) 810 self.parent_conn.close() 811 self.child_conn.close() 812 813class _TestSubclassingProcess(BaseTestCase): 814 815 ALLOWED_TYPES = ('processes',) 816 817 def test_subclassing(self): 818 uppercaser = _UpperCaser() 819 uppercaser.daemon = True 820 uppercaser.start() 821 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 822 self.assertEqual(uppercaser.submit('world'), 'WORLD') 823 uppercaser.stop() 824 uppercaser.join() 825 826 def test_stderr_flush(self): 827 # sys.stderr is flushed at process shutdown (issue #13812) 828 if self.TYPE == "threads": 829 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 830 831 testfn = os_helper.TESTFN 832 self.addCleanup(os_helper.unlink, testfn) 833 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 834 proc.start() 835 proc.join() 836 with open(testfn, encoding="utf-8") as f: 837 err = f.read() 838 # The whole traceback was printed 839 self.assertIn("ZeroDivisionError", err) 840 self.assertIn("test_multiprocessing.py", err) 841 self.assertIn("1/0 # MARKER", err) 842 843 @classmethod 844 def _test_stderr_flush(cls, testfn): 845 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 846 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 847 1/0 # MARKER 848 849 850 @classmethod 851 def _test_sys_exit(cls, reason, testfn): 852 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 853 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 854 sys.exit(reason) 855 856 def test_sys_exit(self): 857 # See Issue 13854 858 if self.TYPE == 'threads': 859 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 860 861 testfn = os_helper.TESTFN 862 self.addCleanup(os_helper.unlink, testfn) 863 864 for reason in ( 865 [1, 2, 3], 866 'ignore this', 867 ): 868 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 869 p.daemon = True 870 p.start() 871 join_process(p) 872 self.assertEqual(p.exitcode, 1) 873 874 with open(testfn, encoding="utf-8") as f: 875 content = f.read() 876 self.assertEqual(content.rstrip(), str(reason)) 877 878 os.unlink(testfn) 879 880 cases = [ 881 ((True,), 1), 882 ((False,), 0), 883 ((8,), 8), 884 ((None,), 0), 885 ((), 0), 886 ] 887 888 for args, expected in cases: 889 with self.subTest(args=args): 890 p = self.Process(target=sys.exit, args=args) 891 p.daemon = True 892 p.start() 893 join_process(p) 894 self.assertEqual(p.exitcode, expected) 895 896# 897# 898# 899 900def queue_empty(q): 901 if hasattr(q, 'empty'): 902 return q.empty() 903 else: 904 return q.qsize() == 0 905 906def queue_full(q, maxsize): 907 if hasattr(q, 'full'): 908 return q.full() 909 else: 910 return q.qsize() == maxsize 911 912 913class _TestQueue(BaseTestCase): 914 915 916 @classmethod 917 def _test_put(cls, queue, child_can_start, parent_can_continue): 918 child_can_start.wait() 919 for i in range(6): 920 queue.get() 921 parent_can_continue.set() 922 923 def test_put(self): 924 MAXSIZE = 6 925 queue = self.Queue(maxsize=MAXSIZE) 926 child_can_start = self.Event() 927 parent_can_continue = self.Event() 928 929 proc = self.Process( 930 target=self._test_put, 931 args=(queue, child_can_start, parent_can_continue) 932 ) 933 proc.daemon = True 934 proc.start() 935 936 self.assertEqual(queue_empty(queue), True) 937 self.assertEqual(queue_full(queue, MAXSIZE), False) 938 939 queue.put(1) 940 queue.put(2, True) 941 queue.put(3, True, None) 942 queue.put(4, False) 943 queue.put(5, False, None) 944 queue.put_nowait(6) 945 946 # the values may be in buffer but not yet in pipe so sleep a bit 947 time.sleep(DELTA) 948 949 self.assertEqual(queue_empty(queue), False) 950 self.assertEqual(queue_full(queue, MAXSIZE), True) 951 952 put = TimingWrapper(queue.put) 953 put_nowait = TimingWrapper(queue.put_nowait) 954 955 self.assertRaises(pyqueue.Full, put, 7, False) 956 self.assertTimingAlmostEqual(put.elapsed, 0) 957 958 self.assertRaises(pyqueue.Full, put, 7, False, None) 959 self.assertTimingAlmostEqual(put.elapsed, 0) 960 961 self.assertRaises(pyqueue.Full, put_nowait, 7) 962 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 963 964 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 965 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 966 967 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 968 self.assertTimingAlmostEqual(put.elapsed, 0) 969 970 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 971 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 972 973 child_can_start.set() 974 parent_can_continue.wait() 975 976 self.assertEqual(queue_empty(queue), True) 977 self.assertEqual(queue_full(queue, MAXSIZE), False) 978 979 proc.join() 980 close_queue(queue) 981 982 @classmethod 983 def _test_get(cls, queue, child_can_start, parent_can_continue): 984 child_can_start.wait() 985 #queue.put(1) 986 queue.put(2) 987 queue.put(3) 988 queue.put(4) 989 queue.put(5) 990 parent_can_continue.set() 991 992 def test_get(self): 993 queue = self.Queue() 994 child_can_start = self.Event() 995 parent_can_continue = self.Event() 996 997 proc = self.Process( 998 target=self._test_get, 999 args=(queue, child_can_start, parent_can_continue) 1000 ) 1001 proc.daemon = True 1002 proc.start() 1003 1004 self.assertEqual(queue_empty(queue), True) 1005 1006 child_can_start.set() 1007 parent_can_continue.wait() 1008 1009 time.sleep(DELTA) 1010 self.assertEqual(queue_empty(queue), False) 1011 1012 # Hangs unexpectedly, remove for now 1013 #self.assertEqual(queue.get(), 1) 1014 self.assertEqual(queue.get(True, None), 2) 1015 self.assertEqual(queue.get(True), 3) 1016 self.assertEqual(queue.get(timeout=1), 4) 1017 self.assertEqual(queue.get_nowait(), 5) 1018 1019 self.assertEqual(queue_empty(queue), True) 1020 1021 get = TimingWrapper(queue.get) 1022 get_nowait = TimingWrapper(queue.get_nowait) 1023 1024 self.assertRaises(pyqueue.Empty, get, False) 1025 self.assertTimingAlmostEqual(get.elapsed, 0) 1026 1027 self.assertRaises(pyqueue.Empty, get, False, None) 1028 self.assertTimingAlmostEqual(get.elapsed, 0) 1029 1030 self.assertRaises(pyqueue.Empty, get_nowait) 1031 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1032 1033 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1034 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1035 1036 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1037 self.assertTimingAlmostEqual(get.elapsed, 0) 1038 1039 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1040 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1041 1042 proc.join() 1043 close_queue(queue) 1044 1045 @classmethod 1046 def _test_fork(cls, queue): 1047 for i in range(10, 20): 1048 queue.put(i) 1049 # note that at this point the items may only be buffered, so the 1050 # process cannot shutdown until the feeder thread has finished 1051 # pushing items onto the pipe. 1052 1053 def test_fork(self): 1054 # Old versions of Queue would fail to create a new feeder 1055 # thread for a forked process if the original process had its 1056 # own feeder thread. This test checks that this no longer 1057 # happens. 1058 1059 queue = self.Queue() 1060 1061 # put items on queue so that main process starts a feeder thread 1062 for i in range(10): 1063 queue.put(i) 1064 1065 # wait to make sure thread starts before we fork a new process 1066 time.sleep(DELTA) 1067 1068 # fork process 1069 p = self.Process(target=self._test_fork, args=(queue,)) 1070 p.daemon = True 1071 p.start() 1072 1073 # check that all expected items are in the queue 1074 for i in range(20): 1075 self.assertEqual(queue.get(), i) 1076 self.assertRaises(pyqueue.Empty, queue.get, False) 1077 1078 p.join() 1079 close_queue(queue) 1080 1081 def test_qsize(self): 1082 q = self.Queue() 1083 try: 1084 self.assertEqual(q.qsize(), 0) 1085 except NotImplementedError: 1086 self.skipTest('qsize method not implemented') 1087 q.put(1) 1088 self.assertEqual(q.qsize(), 1) 1089 q.put(5) 1090 self.assertEqual(q.qsize(), 2) 1091 q.get() 1092 self.assertEqual(q.qsize(), 1) 1093 q.get() 1094 self.assertEqual(q.qsize(), 0) 1095 close_queue(q) 1096 1097 @classmethod 1098 def _test_task_done(cls, q): 1099 for obj in iter(q.get, None): 1100 time.sleep(DELTA) 1101 q.task_done() 1102 1103 def test_task_done(self): 1104 queue = self.JoinableQueue() 1105 1106 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1107 for i in range(4)] 1108 1109 for p in workers: 1110 p.daemon = True 1111 p.start() 1112 1113 for i in range(10): 1114 queue.put(i) 1115 1116 queue.join() 1117 1118 for p in workers: 1119 queue.put(None) 1120 1121 for p in workers: 1122 p.join() 1123 close_queue(queue) 1124 1125 def test_no_import_lock_contention(self): 1126 with os_helper.temp_cwd(): 1127 module_name = 'imported_by_an_imported_module' 1128 with open(module_name + '.py', 'w', encoding="utf-8") as f: 1129 f.write("""if 1: 1130 import multiprocessing 1131 1132 q = multiprocessing.Queue() 1133 q.put('knock knock') 1134 q.get(timeout=3) 1135 q.close() 1136 del q 1137 """) 1138 1139 with import_helper.DirsOnSysPath(os.getcwd()): 1140 try: 1141 __import__(module_name) 1142 except pyqueue.Empty: 1143 self.fail("Probable regression on import lock contention;" 1144 " see Issue #22853") 1145 1146 def test_timeout(self): 1147 q = multiprocessing.Queue() 1148 start = time.monotonic() 1149 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1150 delta = time.monotonic() - start 1151 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1152 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1153 # failed because the delta was only 135.8 ms. 1154 self.assertGreaterEqual(delta, 0.100) 1155 close_queue(q) 1156 1157 def test_queue_feeder_donot_stop_onexc(self): 1158 # bpo-30414: verify feeder handles exceptions correctly 1159 if self.TYPE != 'processes': 1160 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1161 1162 class NotSerializable(object): 1163 def __reduce__(self): 1164 raise AttributeError 1165 with test.support.captured_stderr(): 1166 q = self.Queue() 1167 q.put(NotSerializable()) 1168 q.put(True) 1169 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1170 close_queue(q) 1171 1172 with test.support.captured_stderr(): 1173 # bpo-33078: verify that the queue size is correctly handled 1174 # on errors. 1175 q = self.Queue(maxsize=1) 1176 q.put(NotSerializable()) 1177 q.put(True) 1178 try: 1179 self.assertEqual(q.qsize(), 1) 1180 except NotImplementedError: 1181 # qsize is not available on all platform as it 1182 # relies on sem_getvalue 1183 pass 1184 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1185 # Check that the size of the queue is correct 1186 self.assertTrue(q.empty()) 1187 close_queue(q) 1188 1189 def test_queue_feeder_on_queue_feeder_error(self): 1190 # bpo-30006: verify feeder handles exceptions using the 1191 # _on_queue_feeder_error hook. 1192 if self.TYPE != 'processes': 1193 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1194 1195 class NotSerializable(object): 1196 """Mock unserializable object""" 1197 def __init__(self): 1198 self.reduce_was_called = False 1199 self.on_queue_feeder_error_was_called = False 1200 1201 def __reduce__(self): 1202 self.reduce_was_called = True 1203 raise AttributeError 1204 1205 class SafeQueue(multiprocessing.queues.Queue): 1206 """Queue with overloaded _on_queue_feeder_error hook""" 1207 @staticmethod 1208 def _on_queue_feeder_error(e, obj): 1209 if (isinstance(e, AttributeError) and 1210 isinstance(obj, NotSerializable)): 1211 obj.on_queue_feeder_error_was_called = True 1212 1213 not_serializable_obj = NotSerializable() 1214 # The captured_stderr reduces the noise in the test report 1215 with test.support.captured_stderr(): 1216 q = SafeQueue(ctx=multiprocessing.get_context()) 1217 q.put(not_serializable_obj) 1218 1219 # Verify that q is still functioning correctly 1220 q.put(True) 1221 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1222 1223 # Assert that the serialization and the hook have been called correctly 1224 self.assertTrue(not_serializable_obj.reduce_was_called) 1225 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1226 1227 def test_closed_queue_put_get_exceptions(self): 1228 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1229 q.close() 1230 with self.assertRaisesRegex(ValueError, 'is closed'): 1231 q.put('foo') 1232 with self.assertRaisesRegex(ValueError, 'is closed'): 1233 q.get() 1234# 1235# 1236# 1237 1238class _TestLock(BaseTestCase): 1239 1240 def test_lock(self): 1241 lock = self.Lock() 1242 self.assertEqual(lock.acquire(), True) 1243 self.assertEqual(lock.acquire(False), False) 1244 self.assertEqual(lock.release(), None) 1245 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1246 1247 def test_rlock(self): 1248 lock = self.RLock() 1249 self.assertEqual(lock.acquire(), True) 1250 self.assertEqual(lock.acquire(), True) 1251 self.assertEqual(lock.acquire(), True) 1252 self.assertEqual(lock.release(), None) 1253 self.assertEqual(lock.release(), None) 1254 self.assertEqual(lock.release(), None) 1255 self.assertRaises((AssertionError, RuntimeError), lock.release) 1256 1257 def test_lock_context(self): 1258 with self.Lock(): 1259 pass 1260 1261 1262class _TestSemaphore(BaseTestCase): 1263 1264 def _test_semaphore(self, sem): 1265 self.assertReturnsIfImplemented(2, get_value, sem) 1266 self.assertEqual(sem.acquire(), True) 1267 self.assertReturnsIfImplemented(1, get_value, sem) 1268 self.assertEqual(sem.acquire(), True) 1269 self.assertReturnsIfImplemented(0, get_value, sem) 1270 self.assertEqual(sem.acquire(False), False) 1271 self.assertReturnsIfImplemented(0, get_value, sem) 1272 self.assertEqual(sem.release(), None) 1273 self.assertReturnsIfImplemented(1, get_value, sem) 1274 self.assertEqual(sem.release(), None) 1275 self.assertReturnsIfImplemented(2, get_value, sem) 1276 1277 def test_semaphore(self): 1278 sem = self.Semaphore(2) 1279 self._test_semaphore(sem) 1280 self.assertEqual(sem.release(), None) 1281 self.assertReturnsIfImplemented(3, get_value, sem) 1282 self.assertEqual(sem.release(), None) 1283 self.assertReturnsIfImplemented(4, get_value, sem) 1284 1285 def test_bounded_semaphore(self): 1286 sem = self.BoundedSemaphore(2) 1287 self._test_semaphore(sem) 1288 # Currently fails on OS/X 1289 #if HAVE_GETVALUE: 1290 # self.assertRaises(ValueError, sem.release) 1291 # self.assertReturnsIfImplemented(2, get_value, sem) 1292 1293 def test_timeout(self): 1294 if self.TYPE != 'processes': 1295 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1296 1297 sem = self.Semaphore(0) 1298 acquire = TimingWrapper(sem.acquire) 1299 1300 self.assertEqual(acquire(False), False) 1301 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1302 1303 self.assertEqual(acquire(False, None), False) 1304 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1305 1306 self.assertEqual(acquire(False, TIMEOUT1), False) 1307 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1308 1309 self.assertEqual(acquire(True, TIMEOUT2), False) 1310 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1311 1312 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1313 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1314 1315 1316class _TestCondition(BaseTestCase): 1317 1318 @classmethod 1319 def f(cls, cond, sleeping, woken, timeout=None): 1320 cond.acquire() 1321 sleeping.release() 1322 cond.wait(timeout) 1323 woken.release() 1324 cond.release() 1325 1326 def assertReachesEventually(self, func, value): 1327 for i in range(10): 1328 try: 1329 if func() == value: 1330 break 1331 except NotImplementedError: 1332 break 1333 time.sleep(DELTA) 1334 time.sleep(DELTA) 1335 self.assertReturnsIfImplemented(value, func) 1336 1337 def check_invariant(self, cond): 1338 # this is only supposed to succeed when there are no sleepers 1339 if self.TYPE == 'processes': 1340 try: 1341 sleepers = (cond._sleeping_count.get_value() - 1342 cond._woken_count.get_value()) 1343 self.assertEqual(sleepers, 0) 1344 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1345 except NotImplementedError: 1346 pass 1347 1348 def test_notify(self): 1349 cond = self.Condition() 1350 sleeping = self.Semaphore(0) 1351 woken = self.Semaphore(0) 1352 1353 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1354 p.daemon = True 1355 p.start() 1356 self.addCleanup(p.join) 1357 1358 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1359 p.daemon = True 1360 p.start() 1361 self.addCleanup(p.join) 1362 1363 # wait for both children to start sleeping 1364 sleeping.acquire() 1365 sleeping.acquire() 1366 1367 # check no process/thread has woken up 1368 time.sleep(DELTA) 1369 self.assertReturnsIfImplemented(0, get_value, woken) 1370 1371 # wake up one process/thread 1372 cond.acquire() 1373 cond.notify() 1374 cond.release() 1375 1376 # check one process/thread has woken up 1377 time.sleep(DELTA) 1378 self.assertReturnsIfImplemented(1, get_value, woken) 1379 1380 # wake up another 1381 cond.acquire() 1382 cond.notify() 1383 cond.release() 1384 1385 # check other has woken up 1386 time.sleep(DELTA) 1387 self.assertReturnsIfImplemented(2, get_value, woken) 1388 1389 # check state is not mucked up 1390 self.check_invariant(cond) 1391 p.join() 1392 1393 def test_notify_all(self): 1394 cond = self.Condition() 1395 sleeping = self.Semaphore(0) 1396 woken = self.Semaphore(0) 1397 1398 # start some threads/processes which will timeout 1399 for i in range(3): 1400 p = self.Process(target=self.f, 1401 args=(cond, sleeping, woken, TIMEOUT1)) 1402 p.daemon = True 1403 p.start() 1404 self.addCleanup(p.join) 1405 1406 t = threading.Thread(target=self.f, 1407 args=(cond, sleeping, woken, TIMEOUT1)) 1408 t.daemon = True 1409 t.start() 1410 self.addCleanup(t.join) 1411 1412 # wait for them all to sleep 1413 for i in range(6): 1414 sleeping.acquire() 1415 1416 # check they have all timed out 1417 for i in range(6): 1418 woken.acquire() 1419 self.assertReturnsIfImplemented(0, get_value, woken) 1420 1421 # check state is not mucked up 1422 self.check_invariant(cond) 1423 1424 # start some more threads/processes 1425 for i in range(3): 1426 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1427 p.daemon = True 1428 p.start() 1429 self.addCleanup(p.join) 1430 1431 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1432 t.daemon = True 1433 t.start() 1434 self.addCleanup(t.join) 1435 1436 # wait for them to all sleep 1437 for i in range(6): 1438 sleeping.acquire() 1439 1440 # check no process/thread has woken up 1441 time.sleep(DELTA) 1442 self.assertReturnsIfImplemented(0, get_value, woken) 1443 1444 # wake them all up 1445 cond.acquire() 1446 cond.notify_all() 1447 cond.release() 1448 1449 # check they have all woken 1450 self.assertReachesEventually(lambda: get_value(woken), 6) 1451 1452 # check state is not mucked up 1453 self.check_invariant(cond) 1454 1455 def test_notify_n(self): 1456 cond = self.Condition() 1457 sleeping = self.Semaphore(0) 1458 woken = self.Semaphore(0) 1459 1460 # start some threads/processes 1461 for i in range(3): 1462 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1463 p.daemon = True 1464 p.start() 1465 self.addCleanup(p.join) 1466 1467 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1468 t.daemon = True 1469 t.start() 1470 self.addCleanup(t.join) 1471 1472 # wait for them to all sleep 1473 for i in range(6): 1474 sleeping.acquire() 1475 1476 # check no process/thread has woken up 1477 time.sleep(DELTA) 1478 self.assertReturnsIfImplemented(0, get_value, woken) 1479 1480 # wake some of them up 1481 cond.acquire() 1482 cond.notify(n=2) 1483 cond.release() 1484 1485 # check 2 have woken 1486 self.assertReachesEventually(lambda: get_value(woken), 2) 1487 1488 # wake the rest of them 1489 cond.acquire() 1490 cond.notify(n=4) 1491 cond.release() 1492 1493 self.assertReachesEventually(lambda: get_value(woken), 6) 1494 1495 # doesn't do anything more 1496 cond.acquire() 1497 cond.notify(n=3) 1498 cond.release() 1499 1500 self.assertReturnsIfImplemented(6, get_value, woken) 1501 1502 # check state is not mucked up 1503 self.check_invariant(cond) 1504 1505 def test_timeout(self): 1506 cond = self.Condition() 1507 wait = TimingWrapper(cond.wait) 1508 cond.acquire() 1509 res = wait(TIMEOUT1) 1510 cond.release() 1511 self.assertEqual(res, False) 1512 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1513 1514 @classmethod 1515 def _test_waitfor_f(cls, cond, state): 1516 with cond: 1517 state.value = 0 1518 cond.notify() 1519 result = cond.wait_for(lambda : state.value==4) 1520 if not result or state.value != 4: 1521 sys.exit(1) 1522 1523 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1524 def test_waitfor(self): 1525 # based on test in test/lock_tests.py 1526 cond = self.Condition() 1527 state = self.Value('i', -1) 1528 1529 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1530 p.daemon = True 1531 p.start() 1532 1533 with cond: 1534 result = cond.wait_for(lambda : state.value==0) 1535 self.assertTrue(result) 1536 self.assertEqual(state.value, 0) 1537 1538 for i in range(4): 1539 time.sleep(0.01) 1540 with cond: 1541 state.value += 1 1542 cond.notify() 1543 1544 join_process(p) 1545 self.assertEqual(p.exitcode, 0) 1546 1547 @classmethod 1548 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1549 sem.release() 1550 with cond: 1551 expected = 0.1 1552 dt = time.monotonic() 1553 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1554 dt = time.monotonic() - dt 1555 # borrow logic in assertTimeout() from test/lock_tests.py 1556 if not result and expected * 0.6 < dt < expected * 10.0: 1557 success.value = True 1558 1559 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1560 def test_waitfor_timeout(self): 1561 # based on test in test/lock_tests.py 1562 cond = self.Condition() 1563 state = self.Value('i', 0) 1564 success = self.Value('i', False) 1565 sem = self.Semaphore(0) 1566 1567 p = self.Process(target=self._test_waitfor_timeout_f, 1568 args=(cond, state, success, sem)) 1569 p.daemon = True 1570 p.start() 1571 self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT)) 1572 1573 # Only increment 3 times, so state == 4 is never reached. 1574 for i in range(3): 1575 time.sleep(0.01) 1576 with cond: 1577 state.value += 1 1578 cond.notify() 1579 1580 join_process(p) 1581 self.assertTrue(success.value) 1582 1583 @classmethod 1584 def _test_wait_result(cls, c, pid): 1585 with c: 1586 c.notify() 1587 time.sleep(1) 1588 if pid is not None: 1589 os.kill(pid, signal.SIGINT) 1590 1591 def test_wait_result(self): 1592 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1593 pid = os.getpid() 1594 else: 1595 pid = None 1596 1597 c = self.Condition() 1598 with c: 1599 self.assertFalse(c.wait(0)) 1600 self.assertFalse(c.wait(0.1)) 1601 1602 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1603 p.start() 1604 1605 self.assertTrue(c.wait(60)) 1606 if pid is not None: 1607 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1608 1609 p.join() 1610 1611 1612class _TestEvent(BaseTestCase): 1613 1614 @classmethod 1615 def _test_event(cls, event): 1616 time.sleep(TIMEOUT2) 1617 event.set() 1618 1619 def test_event(self): 1620 event = self.Event() 1621 wait = TimingWrapper(event.wait) 1622 1623 # Removed temporarily, due to API shear, this does not 1624 # work with threading._Event objects. is_set == isSet 1625 self.assertEqual(event.is_set(), False) 1626 1627 # Removed, threading.Event.wait() will return the value of the __flag 1628 # instead of None. API Shear with the semaphore backed mp.Event 1629 self.assertEqual(wait(0.0), False) 1630 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1631 self.assertEqual(wait(TIMEOUT1), False) 1632 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1633 1634 event.set() 1635 1636 # See note above on the API differences 1637 self.assertEqual(event.is_set(), True) 1638 self.assertEqual(wait(), True) 1639 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1640 self.assertEqual(wait(TIMEOUT1), True) 1641 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1642 # self.assertEqual(event.is_set(), True) 1643 1644 event.clear() 1645 1646 #self.assertEqual(event.is_set(), False) 1647 1648 p = self.Process(target=self._test_event, args=(event,)) 1649 p.daemon = True 1650 p.start() 1651 self.assertEqual(wait(), True) 1652 p.join() 1653 1654# 1655# Tests for Barrier - adapted from tests in test/lock_tests.py 1656# 1657 1658# Many of the tests for threading.Barrier use a list as an atomic 1659# counter: a value is appended to increment the counter, and the 1660# length of the list gives the value. We use the class DummyList 1661# for the same purpose. 1662 1663class _DummyList(object): 1664 1665 def __init__(self): 1666 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1667 lock = multiprocessing.Lock() 1668 self.__setstate__((wrapper, lock)) 1669 self._lengthbuf[0] = 0 1670 1671 def __setstate__(self, state): 1672 (self._wrapper, self._lock) = state 1673 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1674 1675 def __getstate__(self): 1676 return (self._wrapper, self._lock) 1677 1678 def append(self, _): 1679 with self._lock: 1680 self._lengthbuf[0] += 1 1681 1682 def __len__(self): 1683 with self._lock: 1684 return self._lengthbuf[0] 1685 1686def _wait(): 1687 # A crude wait/yield function not relying on synchronization primitives. 1688 time.sleep(0.01) 1689 1690 1691class Bunch(object): 1692 """ 1693 A bunch of threads. 1694 """ 1695 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1696 """ 1697 Construct a bunch of `n` threads running the same function `f`. 1698 If `wait_before_exit` is True, the threads won't terminate until 1699 do_finish() is called. 1700 """ 1701 self.f = f 1702 self.args = args 1703 self.n = n 1704 self.started = namespace.DummyList() 1705 self.finished = namespace.DummyList() 1706 self._can_exit = namespace.Event() 1707 if not wait_before_exit: 1708 self._can_exit.set() 1709 1710 threads = [] 1711 for i in range(n): 1712 p = namespace.Process(target=self.task) 1713 p.daemon = True 1714 p.start() 1715 threads.append(p) 1716 1717 def finalize(threads): 1718 for p in threads: 1719 p.join() 1720 1721 self._finalizer = weakref.finalize(self, finalize, threads) 1722 1723 def task(self): 1724 pid = os.getpid() 1725 self.started.append(pid) 1726 try: 1727 self.f(*self.args) 1728 finally: 1729 self.finished.append(pid) 1730 self._can_exit.wait(30) 1731 assert self._can_exit.is_set() 1732 1733 def wait_for_started(self): 1734 while len(self.started) < self.n: 1735 _wait() 1736 1737 def wait_for_finished(self): 1738 while len(self.finished) < self.n: 1739 _wait() 1740 1741 def do_finish(self): 1742 self._can_exit.set() 1743 1744 def close(self): 1745 self._finalizer() 1746 1747 1748class AppendTrue(object): 1749 def __init__(self, obj): 1750 self.obj = obj 1751 def __call__(self): 1752 self.obj.append(True) 1753 1754 1755class _TestBarrier(BaseTestCase): 1756 """ 1757 Tests for Barrier objects. 1758 """ 1759 N = 5 1760 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1761 1762 def setUp(self): 1763 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1764 1765 def tearDown(self): 1766 self.barrier.abort() 1767 self.barrier = None 1768 1769 def DummyList(self): 1770 if self.TYPE == 'threads': 1771 return [] 1772 elif self.TYPE == 'manager': 1773 return self.manager.list() 1774 else: 1775 return _DummyList() 1776 1777 def run_threads(self, f, args): 1778 b = Bunch(self, f, args, self.N-1) 1779 try: 1780 f(*args) 1781 b.wait_for_finished() 1782 finally: 1783 b.close() 1784 1785 @classmethod 1786 def multipass(cls, barrier, results, n): 1787 m = barrier.parties 1788 assert m == cls.N 1789 for i in range(n): 1790 results[0].append(True) 1791 assert len(results[1]) == i * m 1792 barrier.wait() 1793 results[1].append(True) 1794 assert len(results[0]) == (i + 1) * m 1795 barrier.wait() 1796 try: 1797 assert barrier.n_waiting == 0 1798 except NotImplementedError: 1799 pass 1800 assert not barrier.broken 1801 1802 def test_barrier(self, passes=1): 1803 """ 1804 Test that a barrier is passed in lockstep 1805 """ 1806 results = [self.DummyList(), self.DummyList()] 1807 self.run_threads(self.multipass, (self.barrier, results, passes)) 1808 1809 def test_barrier_10(self): 1810 """ 1811 Test that a barrier works for 10 consecutive runs 1812 """ 1813 return self.test_barrier(10) 1814 1815 @classmethod 1816 def _test_wait_return_f(cls, barrier, queue): 1817 res = barrier.wait() 1818 queue.put(res) 1819 1820 def test_wait_return(self): 1821 """ 1822 test the return value from barrier.wait 1823 """ 1824 queue = self.Queue() 1825 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1826 results = [queue.get() for i in range(self.N)] 1827 self.assertEqual(results.count(0), 1) 1828 close_queue(queue) 1829 1830 @classmethod 1831 def _test_action_f(cls, barrier, results): 1832 barrier.wait() 1833 if len(results) != 1: 1834 raise RuntimeError 1835 1836 def test_action(self): 1837 """ 1838 Test the 'action' callback 1839 """ 1840 results = self.DummyList() 1841 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1842 self.run_threads(self._test_action_f, (barrier, results)) 1843 self.assertEqual(len(results), 1) 1844 1845 @classmethod 1846 def _test_abort_f(cls, barrier, results1, results2): 1847 try: 1848 i = barrier.wait() 1849 if i == cls.N//2: 1850 raise RuntimeError 1851 barrier.wait() 1852 results1.append(True) 1853 except threading.BrokenBarrierError: 1854 results2.append(True) 1855 except RuntimeError: 1856 barrier.abort() 1857 1858 def test_abort(self): 1859 """ 1860 Test that an abort will put the barrier in a broken state 1861 """ 1862 results1 = self.DummyList() 1863 results2 = self.DummyList() 1864 self.run_threads(self._test_abort_f, 1865 (self.barrier, results1, results2)) 1866 self.assertEqual(len(results1), 0) 1867 self.assertEqual(len(results2), self.N-1) 1868 self.assertTrue(self.barrier.broken) 1869 1870 @classmethod 1871 def _test_reset_f(cls, barrier, results1, results2, results3): 1872 i = barrier.wait() 1873 if i == cls.N//2: 1874 # Wait until the other threads are all in the barrier. 1875 while barrier.n_waiting < cls.N-1: 1876 time.sleep(0.001) 1877 barrier.reset() 1878 else: 1879 try: 1880 barrier.wait() 1881 results1.append(True) 1882 except threading.BrokenBarrierError: 1883 results2.append(True) 1884 # Now, pass the barrier again 1885 barrier.wait() 1886 results3.append(True) 1887 1888 def test_reset(self): 1889 """ 1890 Test that a 'reset' on a barrier frees the waiting threads 1891 """ 1892 results1 = self.DummyList() 1893 results2 = self.DummyList() 1894 results3 = self.DummyList() 1895 self.run_threads(self._test_reset_f, 1896 (self.barrier, results1, results2, results3)) 1897 self.assertEqual(len(results1), 0) 1898 self.assertEqual(len(results2), self.N-1) 1899 self.assertEqual(len(results3), self.N) 1900 1901 @classmethod 1902 def _test_abort_and_reset_f(cls, barrier, barrier2, 1903 results1, results2, results3): 1904 try: 1905 i = barrier.wait() 1906 if i == cls.N//2: 1907 raise RuntimeError 1908 barrier.wait() 1909 results1.append(True) 1910 except threading.BrokenBarrierError: 1911 results2.append(True) 1912 except RuntimeError: 1913 barrier.abort() 1914 # Synchronize and reset the barrier. Must synchronize first so 1915 # that everyone has left it when we reset, and after so that no 1916 # one enters it before the reset. 1917 if barrier2.wait() == cls.N//2: 1918 barrier.reset() 1919 barrier2.wait() 1920 barrier.wait() 1921 results3.append(True) 1922 1923 def test_abort_and_reset(self): 1924 """ 1925 Test that a barrier can be reset after being broken. 1926 """ 1927 results1 = self.DummyList() 1928 results2 = self.DummyList() 1929 results3 = self.DummyList() 1930 barrier2 = self.Barrier(self.N) 1931 1932 self.run_threads(self._test_abort_and_reset_f, 1933 (self.barrier, barrier2, results1, results2, results3)) 1934 self.assertEqual(len(results1), 0) 1935 self.assertEqual(len(results2), self.N-1) 1936 self.assertEqual(len(results3), self.N) 1937 1938 @classmethod 1939 def _test_timeout_f(cls, barrier, results): 1940 i = barrier.wait() 1941 if i == cls.N//2: 1942 # One thread is late! 1943 time.sleep(1.0) 1944 try: 1945 barrier.wait(0.5) 1946 except threading.BrokenBarrierError: 1947 results.append(True) 1948 1949 def test_timeout(self): 1950 """ 1951 Test wait(timeout) 1952 """ 1953 results = self.DummyList() 1954 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1955 self.assertEqual(len(results), self.barrier.parties) 1956 1957 @classmethod 1958 def _test_default_timeout_f(cls, barrier, results): 1959 i = barrier.wait(cls.defaultTimeout) 1960 if i == cls.N//2: 1961 # One thread is later than the default timeout 1962 time.sleep(1.0) 1963 try: 1964 barrier.wait() 1965 except threading.BrokenBarrierError: 1966 results.append(True) 1967 1968 def test_default_timeout(self): 1969 """ 1970 Test the barrier's default timeout 1971 """ 1972 barrier = self.Barrier(self.N, timeout=0.5) 1973 results = self.DummyList() 1974 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1975 self.assertEqual(len(results), barrier.parties) 1976 1977 def test_single_thread(self): 1978 b = self.Barrier(1) 1979 b.wait() 1980 b.wait() 1981 1982 @classmethod 1983 def _test_thousand_f(cls, barrier, passes, conn, lock): 1984 for i in range(passes): 1985 barrier.wait() 1986 with lock: 1987 conn.send(i) 1988 1989 def test_thousand(self): 1990 if self.TYPE == 'manager': 1991 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1992 passes = 1000 1993 lock = self.Lock() 1994 conn, child_conn = self.Pipe(False) 1995 for j in range(self.N): 1996 p = self.Process(target=self._test_thousand_f, 1997 args=(self.barrier, passes, child_conn, lock)) 1998 p.start() 1999 self.addCleanup(p.join) 2000 2001 for i in range(passes): 2002 for j in range(self.N): 2003 self.assertEqual(conn.recv(), i) 2004 2005# 2006# 2007# 2008 2009class _TestValue(BaseTestCase): 2010 2011 ALLOWED_TYPES = ('processes',) 2012 2013 codes_values = [ 2014 ('i', 4343, 24234), 2015 ('d', 3.625, -4.25), 2016 ('h', -232, 234), 2017 ('q', 2 ** 33, 2 ** 34), 2018 ('c', latin('x'), latin('y')) 2019 ] 2020 2021 def setUp(self): 2022 if not HAS_SHAREDCTYPES: 2023 self.skipTest("requires multiprocessing.sharedctypes") 2024 2025 @classmethod 2026 def _test(cls, values): 2027 for sv, cv in zip(values, cls.codes_values): 2028 sv.value = cv[2] 2029 2030 2031 def test_value(self, raw=False): 2032 if raw: 2033 values = [self.RawValue(code, value) 2034 for code, value, _ in self.codes_values] 2035 else: 2036 values = [self.Value(code, value) 2037 for code, value, _ in self.codes_values] 2038 2039 for sv, cv in zip(values, self.codes_values): 2040 self.assertEqual(sv.value, cv[1]) 2041 2042 proc = self.Process(target=self._test, args=(values,)) 2043 proc.daemon = True 2044 proc.start() 2045 proc.join() 2046 2047 for sv, cv in zip(values, self.codes_values): 2048 self.assertEqual(sv.value, cv[2]) 2049 2050 def test_rawvalue(self): 2051 self.test_value(raw=True) 2052 2053 def test_getobj_getlock(self): 2054 val1 = self.Value('i', 5) 2055 lock1 = val1.get_lock() 2056 obj1 = val1.get_obj() 2057 2058 val2 = self.Value('i', 5, lock=None) 2059 lock2 = val2.get_lock() 2060 obj2 = val2.get_obj() 2061 2062 lock = self.Lock() 2063 val3 = self.Value('i', 5, lock=lock) 2064 lock3 = val3.get_lock() 2065 obj3 = val3.get_obj() 2066 self.assertEqual(lock, lock3) 2067 2068 arr4 = self.Value('i', 5, lock=False) 2069 self.assertFalse(hasattr(arr4, 'get_lock')) 2070 self.assertFalse(hasattr(arr4, 'get_obj')) 2071 2072 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2073 2074 arr5 = self.RawValue('i', 5) 2075 self.assertFalse(hasattr(arr5, 'get_lock')) 2076 self.assertFalse(hasattr(arr5, 'get_obj')) 2077 2078 2079class _TestArray(BaseTestCase): 2080 2081 ALLOWED_TYPES = ('processes',) 2082 2083 @classmethod 2084 def f(cls, seq): 2085 for i in range(1, len(seq)): 2086 seq[i] += seq[i-1] 2087 2088 @unittest.skipIf(c_int is None, "requires _ctypes") 2089 def test_array(self, raw=False): 2090 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2091 if raw: 2092 arr = self.RawArray('i', seq) 2093 else: 2094 arr = self.Array('i', seq) 2095 2096 self.assertEqual(len(arr), len(seq)) 2097 self.assertEqual(arr[3], seq[3]) 2098 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2099 2100 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2101 2102 self.assertEqual(list(arr[:]), seq) 2103 2104 self.f(seq) 2105 2106 p = self.Process(target=self.f, args=(arr,)) 2107 p.daemon = True 2108 p.start() 2109 p.join() 2110 2111 self.assertEqual(list(arr[:]), seq) 2112 2113 @unittest.skipIf(c_int is None, "requires _ctypes") 2114 def test_array_from_size(self): 2115 size = 10 2116 # Test for zeroing (see issue #11675). 2117 # The repetition below strengthens the test by increasing the chances 2118 # of previously allocated non-zero memory being used for the new array 2119 # on the 2nd and 3rd loops. 2120 for _ in range(3): 2121 arr = self.Array('i', size) 2122 self.assertEqual(len(arr), size) 2123 self.assertEqual(list(arr), [0] * size) 2124 arr[:] = range(10) 2125 self.assertEqual(list(arr), list(range(10))) 2126 del arr 2127 2128 @unittest.skipIf(c_int is None, "requires _ctypes") 2129 def test_rawarray(self): 2130 self.test_array(raw=True) 2131 2132 @unittest.skipIf(c_int is None, "requires _ctypes") 2133 def test_getobj_getlock_obj(self): 2134 arr1 = self.Array('i', list(range(10))) 2135 lock1 = arr1.get_lock() 2136 obj1 = arr1.get_obj() 2137 2138 arr2 = self.Array('i', list(range(10)), lock=None) 2139 lock2 = arr2.get_lock() 2140 obj2 = arr2.get_obj() 2141 2142 lock = self.Lock() 2143 arr3 = self.Array('i', list(range(10)), lock=lock) 2144 lock3 = arr3.get_lock() 2145 obj3 = arr3.get_obj() 2146 self.assertEqual(lock, lock3) 2147 2148 arr4 = self.Array('i', range(10), lock=False) 2149 self.assertFalse(hasattr(arr4, 'get_lock')) 2150 self.assertFalse(hasattr(arr4, 'get_obj')) 2151 self.assertRaises(AttributeError, 2152 self.Array, 'i', range(10), lock='notalock') 2153 2154 arr5 = self.RawArray('i', range(10)) 2155 self.assertFalse(hasattr(arr5, 'get_lock')) 2156 self.assertFalse(hasattr(arr5, 'get_obj')) 2157 2158# 2159# 2160# 2161 2162class _TestContainers(BaseTestCase): 2163 2164 ALLOWED_TYPES = ('manager',) 2165 2166 def test_list(self): 2167 a = self.list(list(range(10))) 2168 self.assertEqual(a[:], list(range(10))) 2169 2170 b = self.list() 2171 self.assertEqual(b[:], []) 2172 2173 b.extend(list(range(5))) 2174 self.assertEqual(b[:], list(range(5))) 2175 2176 self.assertEqual(b[2], 2) 2177 self.assertEqual(b[2:10], [2,3,4]) 2178 2179 b *= 2 2180 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2181 2182 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2183 2184 self.assertEqual(a[:], list(range(10))) 2185 2186 d = [a, b] 2187 e = self.list(d) 2188 self.assertEqual( 2189 [element[:] for element in e], 2190 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2191 ) 2192 2193 f = self.list([a]) 2194 a.append('hello') 2195 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2196 2197 def test_list_iter(self): 2198 a = self.list(list(range(10))) 2199 it = iter(a) 2200 self.assertEqual(list(it), list(range(10))) 2201 self.assertEqual(list(it), []) # exhausted 2202 # list modified during iteration 2203 it = iter(a) 2204 a[0] = 100 2205 self.assertEqual(next(it), 100) 2206 2207 def test_list_proxy_in_list(self): 2208 a = self.list([self.list(range(3)) for _i in range(3)]) 2209 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2210 2211 a[0][-1] = 55 2212 self.assertEqual(a[0][:], [0, 1, 55]) 2213 for i in range(1, 3): 2214 self.assertEqual(a[i][:], [0, 1, 2]) 2215 2216 self.assertEqual(a[1].pop(), 2) 2217 self.assertEqual(len(a[1]), 2) 2218 for i in range(0, 3, 2): 2219 self.assertEqual(len(a[i]), 3) 2220 2221 del a 2222 2223 b = self.list() 2224 b.append(b) 2225 del b 2226 2227 def test_dict(self): 2228 d = self.dict() 2229 indices = list(range(65, 70)) 2230 for i in indices: 2231 d[i] = chr(i) 2232 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2233 self.assertEqual(sorted(d.keys()), indices) 2234 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2235 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2236 2237 def test_dict_iter(self): 2238 d = self.dict() 2239 indices = list(range(65, 70)) 2240 for i in indices: 2241 d[i] = chr(i) 2242 it = iter(d) 2243 self.assertEqual(list(it), indices) 2244 self.assertEqual(list(it), []) # exhausted 2245 # dictionary changed size during iteration 2246 it = iter(d) 2247 d.clear() 2248 self.assertRaises(RuntimeError, next, it) 2249 2250 def test_dict_proxy_nested(self): 2251 pets = self.dict(ferrets=2, hamsters=4) 2252 supplies = self.dict(water=10, feed=3) 2253 d = self.dict(pets=pets, supplies=supplies) 2254 2255 self.assertEqual(supplies['water'], 10) 2256 self.assertEqual(d['supplies']['water'], 10) 2257 2258 d['supplies']['blankets'] = 5 2259 self.assertEqual(supplies['blankets'], 5) 2260 self.assertEqual(d['supplies']['blankets'], 5) 2261 2262 d['supplies']['water'] = 7 2263 self.assertEqual(supplies['water'], 7) 2264 self.assertEqual(d['supplies']['water'], 7) 2265 2266 del pets 2267 del supplies 2268 self.assertEqual(d['pets']['ferrets'], 2) 2269 d['supplies']['blankets'] = 11 2270 self.assertEqual(d['supplies']['blankets'], 11) 2271 2272 pets = d['pets'] 2273 supplies = d['supplies'] 2274 supplies['water'] = 7 2275 self.assertEqual(supplies['water'], 7) 2276 self.assertEqual(d['supplies']['water'], 7) 2277 2278 d.clear() 2279 self.assertEqual(len(d), 0) 2280 self.assertEqual(supplies['water'], 7) 2281 self.assertEqual(pets['hamsters'], 4) 2282 2283 l = self.list([pets, supplies]) 2284 l[0]['marmots'] = 1 2285 self.assertEqual(pets['marmots'], 1) 2286 self.assertEqual(l[0]['marmots'], 1) 2287 2288 del pets 2289 del supplies 2290 self.assertEqual(l[0]['marmots'], 1) 2291 2292 outer = self.list([[88, 99], l]) 2293 self.assertIsInstance(outer[0], list) # Not a ListProxy 2294 self.assertEqual(outer[-1][-1]['feed'], 3) 2295 2296 def test_nested_queue(self): 2297 a = self.list() # Test queue inside list 2298 a.append(self.Queue()) 2299 a[0].put(123) 2300 self.assertEqual(a[0].get(), 123) 2301 b = self.dict() # Test queue inside dict 2302 b[0] = self.Queue() 2303 b[0].put(456) 2304 self.assertEqual(b[0].get(), 456) 2305 2306 def test_namespace(self): 2307 n = self.Namespace() 2308 n.name = 'Bob' 2309 n.job = 'Builder' 2310 n._hidden = 'hidden' 2311 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2312 del n.job 2313 self.assertEqual(str(n), "Namespace(name='Bob')") 2314 self.assertTrue(hasattr(n, 'name')) 2315 self.assertTrue(not hasattr(n, 'job')) 2316 2317# 2318# 2319# 2320 2321def sqr(x, wait=0.0): 2322 time.sleep(wait) 2323 return x*x 2324 2325def mul(x, y): 2326 return x*y 2327 2328def raise_large_valuerror(wait): 2329 time.sleep(wait) 2330 raise ValueError("x" * 1024**2) 2331 2332def identity(x): 2333 return x 2334 2335class CountedObject(object): 2336 n_instances = 0 2337 2338 def __new__(cls): 2339 cls.n_instances += 1 2340 return object.__new__(cls) 2341 2342 def __del__(self): 2343 type(self).n_instances -= 1 2344 2345class SayWhenError(ValueError): pass 2346 2347def exception_throwing_generator(total, when): 2348 if when == -1: 2349 raise SayWhenError("Somebody said when") 2350 for i in range(total): 2351 if i == when: 2352 raise SayWhenError("Somebody said when") 2353 yield i 2354 2355 2356class _TestPool(BaseTestCase): 2357 2358 @classmethod 2359 def setUpClass(cls): 2360 super().setUpClass() 2361 cls.pool = cls.Pool(4) 2362 2363 @classmethod 2364 def tearDownClass(cls): 2365 cls.pool.terminate() 2366 cls.pool.join() 2367 cls.pool = None 2368 super().tearDownClass() 2369 2370 def test_apply(self): 2371 papply = self.pool.apply 2372 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2373 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2374 2375 def test_map(self): 2376 pmap = self.pool.map 2377 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2378 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2379 list(map(sqr, list(range(100))))) 2380 2381 def test_starmap(self): 2382 psmap = self.pool.starmap 2383 tuples = list(zip(range(10), range(9,-1, -1))) 2384 self.assertEqual(psmap(mul, tuples), 2385 list(itertools.starmap(mul, tuples))) 2386 tuples = list(zip(range(100), range(99,-1, -1))) 2387 self.assertEqual(psmap(mul, tuples, chunksize=20), 2388 list(itertools.starmap(mul, tuples))) 2389 2390 def test_starmap_async(self): 2391 tuples = list(zip(range(100), range(99,-1, -1))) 2392 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2393 list(itertools.starmap(mul, tuples))) 2394 2395 def test_map_async(self): 2396 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2397 list(map(sqr, list(range(10))))) 2398 2399 def test_map_async_callbacks(self): 2400 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2401 self.pool.map_async(int, ['1'], 2402 callback=call_args.append, 2403 error_callback=call_args.append).wait() 2404 self.assertEqual(1, len(call_args)) 2405 self.assertEqual([1], call_args[0]) 2406 self.pool.map_async(int, ['a'], 2407 callback=call_args.append, 2408 error_callback=call_args.append).wait() 2409 self.assertEqual(2, len(call_args)) 2410 self.assertIsInstance(call_args[1], ValueError) 2411 2412 def test_map_unplicklable(self): 2413 # Issue #19425 -- failure to pickle should not cause a hang 2414 if self.TYPE == 'threads': 2415 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2416 class A(object): 2417 def __reduce__(self): 2418 raise RuntimeError('cannot pickle') 2419 with self.assertRaises(RuntimeError): 2420 self.pool.map(sqr, [A()]*10) 2421 2422 def test_map_chunksize(self): 2423 try: 2424 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2425 except multiprocessing.TimeoutError: 2426 self.fail("pool.map_async with chunksize stalled on null list") 2427 2428 def test_map_handle_iterable_exception(self): 2429 if self.TYPE == 'manager': 2430 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2431 2432 # SayWhenError seen at the very first of the iterable 2433 with self.assertRaises(SayWhenError): 2434 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2435 # again, make sure it's reentrant 2436 with self.assertRaises(SayWhenError): 2437 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2438 2439 with self.assertRaises(SayWhenError): 2440 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2441 2442 class SpecialIterable: 2443 def __iter__(self): 2444 return self 2445 def __next__(self): 2446 raise SayWhenError 2447 def __len__(self): 2448 return 1 2449 with self.assertRaises(SayWhenError): 2450 self.pool.map(sqr, SpecialIterable(), 1) 2451 with self.assertRaises(SayWhenError): 2452 self.pool.map(sqr, SpecialIterable(), 1) 2453 2454 def test_async(self): 2455 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2456 get = TimingWrapper(res.get) 2457 self.assertEqual(get(), 49) 2458 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2459 2460 def test_async_timeout(self): 2461 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2462 get = TimingWrapper(res.get) 2463 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2464 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2465 2466 def test_imap(self): 2467 it = self.pool.imap(sqr, list(range(10))) 2468 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2469 2470 it = self.pool.imap(sqr, list(range(10))) 2471 for i in range(10): 2472 self.assertEqual(next(it), i*i) 2473 self.assertRaises(StopIteration, it.__next__) 2474 2475 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2476 for i in range(1000): 2477 self.assertEqual(next(it), i*i) 2478 self.assertRaises(StopIteration, it.__next__) 2479 2480 def test_imap_handle_iterable_exception(self): 2481 if self.TYPE == 'manager': 2482 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2483 2484 # SayWhenError seen at the very first of the iterable 2485 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2486 self.assertRaises(SayWhenError, it.__next__) 2487 # again, make sure it's reentrant 2488 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2489 self.assertRaises(SayWhenError, it.__next__) 2490 2491 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2492 for i in range(3): 2493 self.assertEqual(next(it), i*i) 2494 self.assertRaises(SayWhenError, it.__next__) 2495 2496 # SayWhenError seen at start of problematic chunk's results 2497 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2498 for i in range(6): 2499 self.assertEqual(next(it), i*i) 2500 self.assertRaises(SayWhenError, it.__next__) 2501 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2502 for i in range(4): 2503 self.assertEqual(next(it), i*i) 2504 self.assertRaises(SayWhenError, it.__next__) 2505 2506 def test_imap_unordered(self): 2507 it = self.pool.imap_unordered(sqr, list(range(10))) 2508 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2509 2510 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2511 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2512 2513 def test_imap_unordered_handle_iterable_exception(self): 2514 if self.TYPE == 'manager': 2515 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2516 2517 # SayWhenError seen at the very first of the iterable 2518 it = self.pool.imap_unordered(sqr, 2519 exception_throwing_generator(1, -1), 2520 1) 2521 self.assertRaises(SayWhenError, it.__next__) 2522 # again, make sure it's reentrant 2523 it = self.pool.imap_unordered(sqr, 2524 exception_throwing_generator(1, -1), 2525 1) 2526 self.assertRaises(SayWhenError, it.__next__) 2527 2528 it = self.pool.imap_unordered(sqr, 2529 exception_throwing_generator(10, 3), 2530 1) 2531 expected_values = list(map(sqr, list(range(10)))) 2532 with self.assertRaises(SayWhenError): 2533 # imap_unordered makes it difficult to anticipate the SayWhenError 2534 for i in range(10): 2535 value = next(it) 2536 self.assertIn(value, expected_values) 2537 expected_values.remove(value) 2538 2539 it = self.pool.imap_unordered(sqr, 2540 exception_throwing_generator(20, 7), 2541 2) 2542 expected_values = list(map(sqr, list(range(20)))) 2543 with self.assertRaises(SayWhenError): 2544 for i in range(20): 2545 value = next(it) 2546 self.assertIn(value, expected_values) 2547 expected_values.remove(value) 2548 2549 def test_make_pool(self): 2550 expected_error = (RemoteError if self.TYPE == 'manager' 2551 else ValueError) 2552 2553 self.assertRaises(expected_error, self.Pool, -1) 2554 self.assertRaises(expected_error, self.Pool, 0) 2555 2556 if self.TYPE != 'manager': 2557 p = self.Pool(3) 2558 try: 2559 self.assertEqual(3, len(p._pool)) 2560 finally: 2561 p.close() 2562 p.join() 2563 2564 def test_terminate(self): 2565 result = self.pool.map_async( 2566 time.sleep, [0.1 for i in range(10000)], chunksize=1 2567 ) 2568 self.pool.terminate() 2569 join = TimingWrapper(self.pool.join) 2570 join() 2571 # Sanity check the pool didn't wait for all tasks to finish 2572 self.assertLess(join.elapsed, 2.0) 2573 2574 def test_empty_iterable(self): 2575 # See Issue 12157 2576 p = self.Pool(1) 2577 2578 self.assertEqual(p.map(sqr, []), []) 2579 self.assertEqual(list(p.imap(sqr, [])), []) 2580 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2581 self.assertEqual(p.map_async(sqr, []).get(), []) 2582 2583 p.close() 2584 p.join() 2585 2586 def test_context(self): 2587 if self.TYPE == 'processes': 2588 L = list(range(10)) 2589 expected = [sqr(i) for i in L] 2590 with self.Pool(2) as p: 2591 r = p.map_async(sqr, L) 2592 self.assertEqual(r.get(), expected) 2593 p.join() 2594 self.assertRaises(ValueError, p.map_async, sqr, L) 2595 2596 @classmethod 2597 def _test_traceback(cls): 2598 raise RuntimeError(123) # some comment 2599 2600 def test_traceback(self): 2601 # We want ensure that the traceback from the child process is 2602 # contained in the traceback raised in the main process. 2603 if self.TYPE == 'processes': 2604 with self.Pool(1) as p: 2605 try: 2606 p.apply(self._test_traceback) 2607 except Exception as e: 2608 exc = e 2609 else: 2610 self.fail('expected RuntimeError') 2611 p.join() 2612 self.assertIs(type(exc), RuntimeError) 2613 self.assertEqual(exc.args, (123,)) 2614 cause = exc.__cause__ 2615 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2616 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2617 2618 with test.support.captured_stderr() as f1: 2619 try: 2620 raise exc 2621 except RuntimeError: 2622 sys.excepthook(*sys.exc_info()) 2623 self.assertIn('raise RuntimeError(123) # some comment', 2624 f1.getvalue()) 2625 # _helper_reraises_exception should not make the error 2626 # a remote exception 2627 with self.Pool(1) as p: 2628 try: 2629 p.map(sqr, exception_throwing_generator(1, -1), 1) 2630 except Exception as e: 2631 exc = e 2632 else: 2633 self.fail('expected SayWhenError') 2634 self.assertIs(type(exc), SayWhenError) 2635 self.assertIs(exc.__cause__, None) 2636 p.join() 2637 2638 @classmethod 2639 def _test_wrapped_exception(cls): 2640 raise RuntimeError('foo') 2641 2642 def test_wrapped_exception(self): 2643 # Issue #20980: Should not wrap exception when using thread pool 2644 with self.Pool(1) as p: 2645 with self.assertRaises(RuntimeError): 2646 p.apply(self._test_wrapped_exception) 2647 p.join() 2648 2649 def test_map_no_failfast(self): 2650 # Issue #23992: the fail-fast behaviour when an exception is raised 2651 # during map() would make Pool.join() deadlock, because a worker 2652 # process would fill the result queue (after the result handler thread 2653 # terminated, hence not draining it anymore). 2654 2655 t_start = time.monotonic() 2656 2657 with self.assertRaises(ValueError): 2658 with self.Pool(2) as p: 2659 try: 2660 p.map(raise_large_valuerror, [0, 1]) 2661 finally: 2662 time.sleep(0.5) 2663 p.close() 2664 p.join() 2665 2666 # check that we indeed waited for all jobs 2667 self.assertGreater(time.monotonic() - t_start, 0.9) 2668 2669 def test_release_task_refs(self): 2670 # Issue #29861: task arguments and results should not be kept 2671 # alive after we are done with them. 2672 objs = [CountedObject() for i in range(10)] 2673 refs = [weakref.ref(o) for o in objs] 2674 self.pool.map(identity, objs) 2675 2676 del objs 2677 gc.collect() # For PyPy or other GCs. 2678 time.sleep(DELTA) # let threaded cleanup code run 2679 self.assertEqual(set(wr() for wr in refs), {None}) 2680 # With a process pool, copies of the objects are returned, check 2681 # they were released too. 2682 self.assertEqual(CountedObject.n_instances, 0) 2683 2684 def test_enter(self): 2685 if self.TYPE == 'manager': 2686 self.skipTest("test not applicable to manager") 2687 2688 pool = self.Pool(1) 2689 with pool: 2690 pass 2691 # call pool.terminate() 2692 # pool is no longer running 2693 2694 with self.assertRaises(ValueError): 2695 # bpo-35477: pool.__enter__() fails if the pool is not running 2696 with pool: 2697 pass 2698 pool.join() 2699 2700 def test_resource_warning(self): 2701 if self.TYPE == 'manager': 2702 self.skipTest("test not applicable to manager") 2703 2704 pool = self.Pool(1) 2705 pool.terminate() 2706 pool.join() 2707 2708 # force state to RUN to emit ResourceWarning in __del__() 2709 pool._state = multiprocessing.pool.RUN 2710 2711 with warnings_helper.check_warnings( 2712 ('unclosed running multiprocessing pool', ResourceWarning)): 2713 pool = None 2714 support.gc_collect() 2715 2716def raising(): 2717 raise KeyError("key") 2718 2719def unpickleable_result(): 2720 return lambda: 42 2721 2722class _TestPoolWorkerErrors(BaseTestCase): 2723 ALLOWED_TYPES = ('processes', ) 2724 2725 def test_async_error_callback(self): 2726 p = multiprocessing.Pool(2) 2727 2728 scratchpad = [None] 2729 def errback(exc): 2730 scratchpad[0] = exc 2731 2732 res = p.apply_async(raising, error_callback=errback) 2733 self.assertRaises(KeyError, res.get) 2734 self.assertTrue(scratchpad[0]) 2735 self.assertIsInstance(scratchpad[0], KeyError) 2736 2737 p.close() 2738 p.join() 2739 2740 def test_unpickleable_result(self): 2741 from multiprocessing.pool import MaybeEncodingError 2742 p = multiprocessing.Pool(2) 2743 2744 # Make sure we don't lose pool processes because of encoding errors. 2745 for iteration in range(20): 2746 2747 scratchpad = [None] 2748 def errback(exc): 2749 scratchpad[0] = exc 2750 2751 res = p.apply_async(unpickleable_result, error_callback=errback) 2752 self.assertRaises(MaybeEncodingError, res.get) 2753 wrapped = scratchpad[0] 2754 self.assertTrue(wrapped) 2755 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2756 self.assertIsNotNone(wrapped.exc) 2757 self.assertIsNotNone(wrapped.value) 2758 2759 p.close() 2760 p.join() 2761 2762class _TestPoolWorkerLifetime(BaseTestCase): 2763 ALLOWED_TYPES = ('processes', ) 2764 2765 def test_pool_worker_lifetime(self): 2766 p = multiprocessing.Pool(3, maxtasksperchild=10) 2767 self.assertEqual(3, len(p._pool)) 2768 origworkerpids = [w.pid for w in p._pool] 2769 # Run many tasks so each worker gets replaced (hopefully) 2770 results = [] 2771 for i in range(100): 2772 results.append(p.apply_async(sqr, (i, ))) 2773 # Fetch the results and verify we got the right answers, 2774 # also ensuring all the tasks have completed. 2775 for (j, res) in enumerate(results): 2776 self.assertEqual(res.get(), sqr(j)) 2777 # Refill the pool 2778 p._repopulate_pool() 2779 # Wait until all workers are alive 2780 # (countdown * DELTA = 5 seconds max startup process time) 2781 countdown = 50 2782 while countdown and not all(w.is_alive() for w in p._pool): 2783 countdown -= 1 2784 time.sleep(DELTA) 2785 finalworkerpids = [w.pid for w in p._pool] 2786 # All pids should be assigned. See issue #7805. 2787 self.assertNotIn(None, origworkerpids) 2788 self.assertNotIn(None, finalworkerpids) 2789 # Finally, check that the worker pids have changed 2790 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2791 p.close() 2792 p.join() 2793 2794 def test_pool_worker_lifetime_early_close(self): 2795 # Issue #10332: closing a pool whose workers have limited lifetimes 2796 # before all the tasks completed would make join() hang. 2797 p = multiprocessing.Pool(3, maxtasksperchild=1) 2798 results = [] 2799 for i in range(6): 2800 results.append(p.apply_async(sqr, (i, 0.3))) 2801 p.close() 2802 p.join() 2803 # check the results 2804 for (j, res) in enumerate(results): 2805 self.assertEqual(res.get(), sqr(j)) 2806 2807 def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): 2808 # tests cases against bpo-38744 and bpo-39360 2809 cmd = '''if 1: 2810 from multiprocessing import Pool 2811 problem = None 2812 class A: 2813 def __init__(self): 2814 self.pool = Pool(processes=1) 2815 def test(): 2816 global problem 2817 problem = A() 2818 problem.pool.map(float, tuple(range(10))) 2819 if __name__ == "__main__": 2820 test() 2821 ''' 2822 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 2823 self.assertEqual(rc, 0) 2824 2825# 2826# Test of creating a customized manager class 2827# 2828 2829from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2830 2831class FooBar(object): 2832 def f(self): 2833 return 'f()' 2834 def g(self): 2835 raise ValueError 2836 def _h(self): 2837 return '_h()' 2838 2839def baz(): 2840 for i in range(10): 2841 yield i*i 2842 2843class IteratorProxy(BaseProxy): 2844 _exposed_ = ('__next__',) 2845 def __iter__(self): 2846 return self 2847 def __next__(self): 2848 return self._callmethod('__next__') 2849 2850class MyManager(BaseManager): 2851 pass 2852 2853MyManager.register('Foo', callable=FooBar) 2854MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2855MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2856 2857 2858class _TestMyManager(BaseTestCase): 2859 2860 ALLOWED_TYPES = ('manager',) 2861 2862 def test_mymanager(self): 2863 manager = MyManager() 2864 manager.start() 2865 self.common(manager) 2866 manager.shutdown() 2867 2868 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2869 # to the manager process if it takes longer than 1 second to stop, 2870 # which happens on slow buildbots. 2871 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2872 2873 def test_mymanager_context(self): 2874 with MyManager() as manager: 2875 self.common(manager) 2876 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2877 # to the manager process if it takes longer than 1 second to stop, 2878 # which happens on slow buildbots. 2879 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2880 2881 def test_mymanager_context_prestarted(self): 2882 manager = MyManager() 2883 manager.start() 2884 with manager: 2885 self.common(manager) 2886 self.assertEqual(manager._process.exitcode, 0) 2887 2888 def common(self, manager): 2889 foo = manager.Foo() 2890 bar = manager.Bar() 2891 baz = manager.baz() 2892 2893 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2894 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2895 2896 self.assertEqual(foo_methods, ['f', 'g']) 2897 self.assertEqual(bar_methods, ['f', '_h']) 2898 2899 self.assertEqual(foo.f(), 'f()') 2900 self.assertRaises(ValueError, foo.g) 2901 self.assertEqual(foo._callmethod('f'), 'f()') 2902 self.assertRaises(RemoteError, foo._callmethod, '_h') 2903 2904 self.assertEqual(bar.f(), 'f()') 2905 self.assertEqual(bar._h(), '_h()') 2906 self.assertEqual(bar._callmethod('f'), 'f()') 2907 self.assertEqual(bar._callmethod('_h'), '_h()') 2908 2909 self.assertEqual(list(baz), [i*i for i in range(10)]) 2910 2911 2912# 2913# Test of connecting to a remote server and using xmlrpclib for serialization 2914# 2915 2916_queue = pyqueue.Queue() 2917def get_queue(): 2918 return _queue 2919 2920class QueueManager(BaseManager): 2921 '''manager class used by server process''' 2922QueueManager.register('get_queue', callable=get_queue) 2923 2924class QueueManager2(BaseManager): 2925 '''manager class which specifies the same interface as QueueManager''' 2926QueueManager2.register('get_queue') 2927 2928 2929SERIALIZER = 'xmlrpclib' 2930 2931class _TestRemoteManager(BaseTestCase): 2932 2933 ALLOWED_TYPES = ('manager',) 2934 values = ['hello world', None, True, 2.25, 2935 'hall\xe5 v\xe4rlden', 2936 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2937 b'hall\xe5 v\xe4rlden', 2938 ] 2939 result = values[:] 2940 2941 @classmethod 2942 def _putter(cls, address, authkey): 2943 manager = QueueManager2( 2944 address=address, authkey=authkey, serializer=SERIALIZER 2945 ) 2946 manager.connect() 2947 queue = manager.get_queue() 2948 # Note that xmlrpclib will deserialize object as a list not a tuple 2949 queue.put(tuple(cls.values)) 2950 2951 def test_remote(self): 2952 authkey = os.urandom(32) 2953 2954 manager = QueueManager( 2955 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER 2956 ) 2957 manager.start() 2958 self.addCleanup(manager.shutdown) 2959 2960 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2961 p.daemon = True 2962 p.start() 2963 2964 manager2 = QueueManager2( 2965 address=manager.address, authkey=authkey, serializer=SERIALIZER 2966 ) 2967 manager2.connect() 2968 queue = manager2.get_queue() 2969 2970 self.assertEqual(queue.get(), self.result) 2971 2972 # Because we are using xmlrpclib for serialization instead of 2973 # pickle this will cause a serialization error. 2974 self.assertRaises(Exception, queue.put, time.sleep) 2975 2976 # Make queue finalizer run before the server is stopped 2977 del queue 2978 2979 2980@hashlib_helper.requires_hashdigest('md5') 2981class _TestManagerRestart(BaseTestCase): 2982 2983 @classmethod 2984 def _putter(cls, address, authkey): 2985 manager = QueueManager( 2986 address=address, authkey=authkey, serializer=SERIALIZER) 2987 manager.connect() 2988 queue = manager.get_queue() 2989 queue.put('hello world') 2990 2991 def test_rapid_restart(self): 2992 authkey = os.urandom(32) 2993 manager = QueueManager( 2994 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2995 try: 2996 srvr = manager.get_server() 2997 addr = srvr.address 2998 # Close the connection.Listener socket which gets opened as a part 2999 # of manager.get_server(). It's not needed for the test. 3000 srvr.listener.close() 3001 manager.start() 3002 3003 p = self.Process(target=self._putter, args=(manager.address, authkey)) 3004 p.start() 3005 p.join() 3006 queue = manager.get_queue() 3007 self.assertEqual(queue.get(), 'hello world') 3008 del queue 3009 finally: 3010 if hasattr(manager, "shutdown"): 3011 manager.shutdown() 3012 3013 manager = QueueManager( 3014 address=addr, authkey=authkey, serializer=SERIALIZER) 3015 try: 3016 manager.start() 3017 self.addCleanup(manager.shutdown) 3018 except OSError as e: 3019 if e.errno != errno.EADDRINUSE: 3020 raise 3021 # Retry after some time, in case the old socket was lingering 3022 # (sporadic failure on buildbots) 3023 time.sleep(1.0) 3024 manager = QueueManager( 3025 address=addr, authkey=authkey, serializer=SERIALIZER) 3026 if hasattr(manager, "shutdown"): 3027 self.addCleanup(manager.shutdown) 3028 3029# 3030# 3031# 3032 3033SENTINEL = latin('') 3034 3035class _TestConnection(BaseTestCase): 3036 3037 ALLOWED_TYPES = ('processes', 'threads') 3038 3039 @classmethod 3040 def _echo(cls, conn): 3041 for msg in iter(conn.recv_bytes, SENTINEL): 3042 conn.send_bytes(msg) 3043 conn.close() 3044 3045 def test_connection(self): 3046 conn, child_conn = self.Pipe() 3047 3048 p = self.Process(target=self._echo, args=(child_conn,)) 3049 p.daemon = True 3050 p.start() 3051 3052 seq = [1, 2.25, None] 3053 msg = latin('hello world') 3054 longmsg = msg * 10 3055 arr = array.array('i', list(range(4))) 3056 3057 if self.TYPE == 'processes': 3058 self.assertEqual(type(conn.fileno()), int) 3059 3060 self.assertEqual(conn.send(seq), None) 3061 self.assertEqual(conn.recv(), seq) 3062 3063 self.assertEqual(conn.send_bytes(msg), None) 3064 self.assertEqual(conn.recv_bytes(), msg) 3065 3066 if self.TYPE == 'processes': 3067 buffer = array.array('i', [0]*10) 3068 expected = list(arr) + [0] * (10 - len(arr)) 3069 self.assertEqual(conn.send_bytes(arr), None) 3070 self.assertEqual(conn.recv_bytes_into(buffer), 3071 len(arr) * buffer.itemsize) 3072 self.assertEqual(list(buffer), expected) 3073 3074 buffer = array.array('i', [0]*10) 3075 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3076 self.assertEqual(conn.send_bytes(arr), None) 3077 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3078 len(arr) * buffer.itemsize) 3079 self.assertEqual(list(buffer), expected) 3080 3081 buffer = bytearray(latin(' ' * 40)) 3082 self.assertEqual(conn.send_bytes(longmsg), None) 3083 try: 3084 res = conn.recv_bytes_into(buffer) 3085 except multiprocessing.BufferTooShort as e: 3086 self.assertEqual(e.args, (longmsg,)) 3087 else: 3088 self.fail('expected BufferTooShort, got %s' % res) 3089 3090 poll = TimingWrapper(conn.poll) 3091 3092 self.assertEqual(poll(), False) 3093 self.assertTimingAlmostEqual(poll.elapsed, 0) 3094 3095 self.assertEqual(poll(-1), False) 3096 self.assertTimingAlmostEqual(poll.elapsed, 0) 3097 3098 self.assertEqual(poll(TIMEOUT1), False) 3099 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3100 3101 conn.send(None) 3102 time.sleep(.1) 3103 3104 self.assertEqual(poll(TIMEOUT1), True) 3105 self.assertTimingAlmostEqual(poll.elapsed, 0) 3106 3107 self.assertEqual(conn.recv(), None) 3108 3109 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3110 conn.send_bytes(really_big_msg) 3111 self.assertEqual(conn.recv_bytes(), really_big_msg) 3112 3113 conn.send_bytes(SENTINEL) # tell child to quit 3114 child_conn.close() 3115 3116 if self.TYPE == 'processes': 3117 self.assertEqual(conn.readable, True) 3118 self.assertEqual(conn.writable, True) 3119 self.assertRaises(EOFError, conn.recv) 3120 self.assertRaises(EOFError, conn.recv_bytes) 3121 3122 p.join() 3123 3124 def test_duplex_false(self): 3125 reader, writer = self.Pipe(duplex=False) 3126 self.assertEqual(writer.send(1), None) 3127 self.assertEqual(reader.recv(), 1) 3128 if self.TYPE == 'processes': 3129 self.assertEqual(reader.readable, True) 3130 self.assertEqual(reader.writable, False) 3131 self.assertEqual(writer.readable, False) 3132 self.assertEqual(writer.writable, True) 3133 self.assertRaises(OSError, reader.send, 2) 3134 self.assertRaises(OSError, writer.recv) 3135 self.assertRaises(OSError, writer.poll) 3136 3137 def test_spawn_close(self): 3138 # We test that a pipe connection can be closed by parent 3139 # process immediately after child is spawned. On Windows this 3140 # would have sometimes failed on old versions because 3141 # child_conn would be closed before the child got a chance to 3142 # duplicate it. 3143 conn, child_conn = self.Pipe() 3144 3145 p = self.Process(target=self._echo, args=(child_conn,)) 3146 p.daemon = True 3147 p.start() 3148 child_conn.close() # this might complete before child initializes 3149 3150 msg = latin('hello') 3151 conn.send_bytes(msg) 3152 self.assertEqual(conn.recv_bytes(), msg) 3153 3154 conn.send_bytes(SENTINEL) 3155 conn.close() 3156 p.join() 3157 3158 def test_sendbytes(self): 3159 if self.TYPE != 'processes': 3160 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3161 3162 msg = latin('abcdefghijklmnopqrstuvwxyz') 3163 a, b = self.Pipe() 3164 3165 a.send_bytes(msg) 3166 self.assertEqual(b.recv_bytes(), msg) 3167 3168 a.send_bytes(msg, 5) 3169 self.assertEqual(b.recv_bytes(), msg[5:]) 3170 3171 a.send_bytes(msg, 7, 8) 3172 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3173 3174 a.send_bytes(msg, 26) 3175 self.assertEqual(b.recv_bytes(), latin('')) 3176 3177 a.send_bytes(msg, 26, 0) 3178 self.assertEqual(b.recv_bytes(), latin('')) 3179 3180 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3181 3182 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3183 3184 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3185 3186 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3187 3188 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3189 3190 @classmethod 3191 def _is_fd_assigned(cls, fd): 3192 try: 3193 os.fstat(fd) 3194 except OSError as e: 3195 if e.errno == errno.EBADF: 3196 return False 3197 raise 3198 else: 3199 return True 3200 3201 @classmethod 3202 def _writefd(cls, conn, data, create_dummy_fds=False): 3203 if create_dummy_fds: 3204 for i in range(0, 256): 3205 if not cls._is_fd_assigned(i): 3206 os.dup2(conn.fileno(), i) 3207 fd = reduction.recv_handle(conn) 3208 if msvcrt: 3209 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3210 os.write(fd, data) 3211 os.close(fd) 3212 3213 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3214 def test_fd_transfer(self): 3215 if self.TYPE != 'processes': 3216 self.skipTest("only makes sense with processes") 3217 conn, child_conn = self.Pipe(duplex=True) 3218 3219 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3220 p.daemon = True 3221 p.start() 3222 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3223 with open(os_helper.TESTFN, "wb") as f: 3224 fd = f.fileno() 3225 if msvcrt: 3226 fd = msvcrt.get_osfhandle(fd) 3227 reduction.send_handle(conn, fd, p.pid) 3228 p.join() 3229 with open(os_helper.TESTFN, "rb") as f: 3230 self.assertEqual(f.read(), b"foo") 3231 3232 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3233 @unittest.skipIf(sys.platform == "win32", 3234 "test semantics don't make sense on Windows") 3235 @unittest.skipIf(MAXFD <= 256, 3236 "largest assignable fd number is too small") 3237 @unittest.skipUnless(hasattr(os, "dup2"), 3238 "test needs os.dup2()") 3239 def test_large_fd_transfer(self): 3240 # With fd > 256 (issue #11657) 3241 if self.TYPE != 'processes': 3242 self.skipTest("only makes sense with processes") 3243 conn, child_conn = self.Pipe(duplex=True) 3244 3245 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3246 p.daemon = True 3247 p.start() 3248 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3249 with open(os_helper.TESTFN, "wb") as f: 3250 fd = f.fileno() 3251 for newfd in range(256, MAXFD): 3252 if not self._is_fd_assigned(newfd): 3253 break 3254 else: 3255 self.fail("could not find an unassigned large file descriptor") 3256 os.dup2(fd, newfd) 3257 try: 3258 reduction.send_handle(conn, newfd, p.pid) 3259 finally: 3260 os.close(newfd) 3261 p.join() 3262 with open(os_helper.TESTFN, "rb") as f: 3263 self.assertEqual(f.read(), b"bar") 3264 3265 @classmethod 3266 def _send_data_without_fd(self, conn): 3267 os.write(conn.fileno(), b"\0") 3268 3269 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3270 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3271 def test_missing_fd_transfer(self): 3272 # Check that exception is raised when received data is not 3273 # accompanied by a file descriptor in ancillary data. 3274 if self.TYPE != 'processes': 3275 self.skipTest("only makes sense with processes") 3276 conn, child_conn = self.Pipe(duplex=True) 3277 3278 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3279 p.daemon = True 3280 p.start() 3281 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3282 p.join() 3283 3284 def test_context(self): 3285 a, b = self.Pipe() 3286 3287 with a, b: 3288 a.send(1729) 3289 self.assertEqual(b.recv(), 1729) 3290 if self.TYPE == 'processes': 3291 self.assertFalse(a.closed) 3292 self.assertFalse(b.closed) 3293 3294 if self.TYPE == 'processes': 3295 self.assertTrue(a.closed) 3296 self.assertTrue(b.closed) 3297 self.assertRaises(OSError, a.recv) 3298 self.assertRaises(OSError, b.recv) 3299 3300class _TestListener(BaseTestCase): 3301 3302 ALLOWED_TYPES = ('processes',) 3303 3304 def test_multiple_bind(self): 3305 for family in self.connection.families: 3306 l = self.connection.Listener(family=family) 3307 self.addCleanup(l.close) 3308 self.assertRaises(OSError, self.connection.Listener, 3309 l.address, family) 3310 3311 def test_context(self): 3312 with self.connection.Listener() as l: 3313 with self.connection.Client(l.address) as c: 3314 with l.accept() as d: 3315 c.send(1729) 3316 self.assertEqual(d.recv(), 1729) 3317 3318 if self.TYPE == 'processes': 3319 self.assertRaises(OSError, l.accept) 3320 3321 @unittest.skipUnless(util.abstract_sockets_supported, 3322 "test needs abstract socket support") 3323 def test_abstract_socket(self): 3324 with self.connection.Listener("\0something") as listener: 3325 with self.connection.Client(listener.address) as client: 3326 with listener.accept() as d: 3327 client.send(1729) 3328 self.assertEqual(d.recv(), 1729) 3329 3330 if self.TYPE == 'processes': 3331 self.assertRaises(OSError, listener.accept) 3332 3333 3334class _TestListenerClient(BaseTestCase): 3335 3336 ALLOWED_TYPES = ('processes', 'threads') 3337 3338 @classmethod 3339 def _test(cls, address): 3340 conn = cls.connection.Client(address) 3341 conn.send('hello') 3342 conn.close() 3343 3344 def test_listener_client(self): 3345 for family in self.connection.families: 3346 l = self.connection.Listener(family=family) 3347 p = self.Process(target=self._test, args=(l.address,)) 3348 p.daemon = True 3349 p.start() 3350 conn = l.accept() 3351 self.assertEqual(conn.recv(), 'hello') 3352 p.join() 3353 l.close() 3354 3355 def test_issue14725(self): 3356 l = self.connection.Listener() 3357 p = self.Process(target=self._test, args=(l.address,)) 3358 p.daemon = True 3359 p.start() 3360 time.sleep(1) 3361 # On Windows the client process should by now have connected, 3362 # written data and closed the pipe handle by now. This causes 3363 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3364 # 14725. 3365 conn = l.accept() 3366 self.assertEqual(conn.recv(), 'hello') 3367 conn.close() 3368 p.join() 3369 l.close() 3370 3371 def test_issue16955(self): 3372 for fam in self.connection.families: 3373 l = self.connection.Listener(family=fam) 3374 c = self.connection.Client(l.address) 3375 a = l.accept() 3376 a.send_bytes(b"hello") 3377 self.assertTrue(c.poll(1)) 3378 a.close() 3379 c.close() 3380 l.close() 3381 3382class _TestPoll(BaseTestCase): 3383 3384 ALLOWED_TYPES = ('processes', 'threads') 3385 3386 def test_empty_string(self): 3387 a, b = self.Pipe() 3388 self.assertEqual(a.poll(), False) 3389 b.send_bytes(b'') 3390 self.assertEqual(a.poll(), True) 3391 self.assertEqual(a.poll(), True) 3392 3393 @classmethod 3394 def _child_strings(cls, conn, strings): 3395 for s in strings: 3396 time.sleep(0.1) 3397 conn.send_bytes(s) 3398 conn.close() 3399 3400 def test_strings(self): 3401 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3402 a, b = self.Pipe() 3403 p = self.Process(target=self._child_strings, args=(b, strings)) 3404 p.start() 3405 3406 for s in strings: 3407 for i in range(200): 3408 if a.poll(0.01): 3409 break 3410 x = a.recv_bytes() 3411 self.assertEqual(s, x) 3412 3413 p.join() 3414 3415 @classmethod 3416 def _child_boundaries(cls, r): 3417 # Polling may "pull" a message in to the child process, but we 3418 # don't want it to pull only part of a message, as that would 3419 # corrupt the pipe for any other processes which might later 3420 # read from it. 3421 r.poll(5) 3422 3423 def test_boundaries(self): 3424 r, w = self.Pipe(False) 3425 p = self.Process(target=self._child_boundaries, args=(r,)) 3426 p.start() 3427 time.sleep(2) 3428 L = [b"first", b"second"] 3429 for obj in L: 3430 w.send_bytes(obj) 3431 w.close() 3432 p.join() 3433 self.assertIn(r.recv_bytes(), L) 3434 3435 @classmethod 3436 def _child_dont_merge(cls, b): 3437 b.send_bytes(b'a') 3438 b.send_bytes(b'b') 3439 b.send_bytes(b'cd') 3440 3441 def test_dont_merge(self): 3442 a, b = self.Pipe() 3443 self.assertEqual(a.poll(0.0), False) 3444 self.assertEqual(a.poll(0.1), False) 3445 3446 p = self.Process(target=self._child_dont_merge, args=(b,)) 3447 p.start() 3448 3449 self.assertEqual(a.recv_bytes(), b'a') 3450 self.assertEqual(a.poll(1.0), True) 3451 self.assertEqual(a.poll(1.0), True) 3452 self.assertEqual(a.recv_bytes(), b'b') 3453 self.assertEqual(a.poll(1.0), True) 3454 self.assertEqual(a.poll(1.0), True) 3455 self.assertEqual(a.poll(0.0), True) 3456 self.assertEqual(a.recv_bytes(), b'cd') 3457 3458 p.join() 3459 3460# 3461# Test of sending connection and socket objects between processes 3462# 3463 3464@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3465@hashlib_helper.requires_hashdigest('md5') 3466class _TestPicklingConnections(BaseTestCase): 3467 3468 ALLOWED_TYPES = ('processes',) 3469 3470 @classmethod 3471 def tearDownClass(cls): 3472 from multiprocessing import resource_sharer 3473 resource_sharer.stop(timeout=support.LONG_TIMEOUT) 3474 3475 @classmethod 3476 def _listener(cls, conn, families): 3477 for fam in families: 3478 l = cls.connection.Listener(family=fam) 3479 conn.send(l.address) 3480 new_conn = l.accept() 3481 conn.send(new_conn) 3482 new_conn.close() 3483 l.close() 3484 3485 l = socket.create_server((socket_helper.HOST, 0)) 3486 conn.send(l.getsockname()) 3487 new_conn, addr = l.accept() 3488 conn.send(new_conn) 3489 new_conn.close() 3490 l.close() 3491 3492 conn.recv() 3493 3494 @classmethod 3495 def _remote(cls, conn): 3496 for (address, msg) in iter(conn.recv, None): 3497 client = cls.connection.Client(address) 3498 client.send(msg.upper()) 3499 client.close() 3500 3501 address, msg = conn.recv() 3502 client = socket.socket() 3503 client.connect(address) 3504 client.sendall(msg.upper()) 3505 client.close() 3506 3507 conn.close() 3508 3509 def test_pickling(self): 3510 families = self.connection.families 3511 3512 lconn, lconn0 = self.Pipe() 3513 lp = self.Process(target=self._listener, args=(lconn0, families)) 3514 lp.daemon = True 3515 lp.start() 3516 lconn0.close() 3517 3518 rconn, rconn0 = self.Pipe() 3519 rp = self.Process(target=self._remote, args=(rconn0,)) 3520 rp.daemon = True 3521 rp.start() 3522 rconn0.close() 3523 3524 for fam in families: 3525 msg = ('This connection uses family %s' % fam).encode('ascii') 3526 address = lconn.recv() 3527 rconn.send((address, msg)) 3528 new_conn = lconn.recv() 3529 self.assertEqual(new_conn.recv(), msg.upper()) 3530 3531 rconn.send(None) 3532 3533 msg = latin('This connection uses a normal socket') 3534 address = lconn.recv() 3535 rconn.send((address, msg)) 3536 new_conn = lconn.recv() 3537 buf = [] 3538 while True: 3539 s = new_conn.recv(100) 3540 if not s: 3541 break 3542 buf.append(s) 3543 buf = b''.join(buf) 3544 self.assertEqual(buf, msg.upper()) 3545 new_conn.close() 3546 3547 lconn.send(None) 3548 3549 rconn.close() 3550 lconn.close() 3551 3552 lp.join() 3553 rp.join() 3554 3555 @classmethod 3556 def child_access(cls, conn): 3557 w = conn.recv() 3558 w.send('all is well') 3559 w.close() 3560 3561 r = conn.recv() 3562 msg = r.recv() 3563 conn.send(msg*2) 3564 3565 conn.close() 3566 3567 def test_access(self): 3568 # On Windows, if we do not specify a destination pid when 3569 # using DupHandle then we need to be careful to use the 3570 # correct access flags for DuplicateHandle(), or else 3571 # DupHandle.detach() will raise PermissionError. For example, 3572 # for a read only pipe handle we should use 3573 # access=FILE_GENERIC_READ. (Unfortunately 3574 # DUPLICATE_SAME_ACCESS does not work.) 3575 conn, child_conn = self.Pipe() 3576 p = self.Process(target=self.child_access, args=(child_conn,)) 3577 p.daemon = True 3578 p.start() 3579 child_conn.close() 3580 3581 r, w = self.Pipe(duplex=False) 3582 conn.send(w) 3583 w.close() 3584 self.assertEqual(r.recv(), 'all is well') 3585 r.close() 3586 3587 r, w = self.Pipe(duplex=False) 3588 conn.send(r) 3589 r.close() 3590 w.send('foobar') 3591 w.close() 3592 self.assertEqual(conn.recv(), 'foobar'*2) 3593 3594 p.join() 3595 3596# 3597# 3598# 3599 3600class _TestHeap(BaseTestCase): 3601 3602 ALLOWED_TYPES = ('processes',) 3603 3604 def setUp(self): 3605 super().setUp() 3606 # Make pristine heap for these tests 3607 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3608 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3609 3610 def tearDown(self): 3611 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3612 super().tearDown() 3613 3614 def test_heap(self): 3615 iterations = 5000 3616 maxblocks = 50 3617 blocks = [] 3618 3619 # get the heap object 3620 heap = multiprocessing.heap.BufferWrapper._heap 3621 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3622 3623 # create and destroy lots of blocks of different sizes 3624 for i in range(iterations): 3625 size = int(random.lognormvariate(0, 1) * 1000) 3626 b = multiprocessing.heap.BufferWrapper(size) 3627 blocks.append(b) 3628 if len(blocks) > maxblocks: 3629 i = random.randrange(maxblocks) 3630 del blocks[i] 3631 del b 3632 3633 # verify the state of the heap 3634 with heap._lock: 3635 all = [] 3636 free = 0 3637 occupied = 0 3638 for L in list(heap._len_to_seq.values()): 3639 # count all free blocks in arenas 3640 for arena, start, stop in L: 3641 all.append((heap._arenas.index(arena), start, stop, 3642 stop-start, 'free')) 3643 free += (stop-start) 3644 for arena, arena_blocks in heap._allocated_blocks.items(): 3645 # count all allocated blocks in arenas 3646 for start, stop in arena_blocks: 3647 all.append((heap._arenas.index(arena), start, stop, 3648 stop-start, 'occupied')) 3649 occupied += (stop-start) 3650 3651 self.assertEqual(free + occupied, 3652 sum(arena.size for arena in heap._arenas)) 3653 3654 all.sort() 3655 3656 for i in range(len(all)-1): 3657 (arena, start, stop) = all[i][:3] 3658 (narena, nstart, nstop) = all[i+1][:3] 3659 if arena != narena: 3660 # Two different arenas 3661 self.assertEqual(stop, heap._arenas[arena].size) # last block 3662 self.assertEqual(nstart, 0) # first block 3663 else: 3664 # Same arena: two adjacent blocks 3665 self.assertEqual(stop, nstart) 3666 3667 # test free'ing all blocks 3668 random.shuffle(blocks) 3669 while blocks: 3670 blocks.pop() 3671 3672 self.assertEqual(heap._n_frees, heap._n_mallocs) 3673 self.assertEqual(len(heap._pending_free_blocks), 0) 3674 self.assertEqual(len(heap._arenas), 0) 3675 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 3676 self.assertEqual(len(heap._len_to_seq), 0) 3677 3678 def test_free_from_gc(self): 3679 # Check that freeing of blocks by the garbage collector doesn't deadlock 3680 # (issue #12352). 3681 # Make sure the GC is enabled, and set lower collection thresholds to 3682 # make collections more frequent (and increase the probability of 3683 # deadlock). 3684 if not gc.isenabled(): 3685 gc.enable() 3686 self.addCleanup(gc.disable) 3687 thresholds = gc.get_threshold() 3688 self.addCleanup(gc.set_threshold, *thresholds) 3689 gc.set_threshold(10) 3690 3691 # perform numerous block allocations, with cyclic references to make 3692 # sure objects are collected asynchronously by the gc 3693 for i in range(5000): 3694 a = multiprocessing.heap.BufferWrapper(1) 3695 b = multiprocessing.heap.BufferWrapper(1) 3696 # circular references 3697 a.buddy = b 3698 b.buddy = a 3699 3700# 3701# 3702# 3703 3704class _Foo(Structure): 3705 _fields_ = [ 3706 ('x', c_int), 3707 ('y', c_double), 3708 ('z', c_longlong,) 3709 ] 3710 3711class _TestSharedCTypes(BaseTestCase): 3712 3713 ALLOWED_TYPES = ('processes',) 3714 3715 def setUp(self): 3716 if not HAS_SHAREDCTYPES: 3717 self.skipTest("requires multiprocessing.sharedctypes") 3718 3719 @classmethod 3720 def _double(cls, x, y, z, foo, arr, string): 3721 x.value *= 2 3722 y.value *= 2 3723 z.value *= 2 3724 foo.x *= 2 3725 foo.y *= 2 3726 string.value *= 2 3727 for i in range(len(arr)): 3728 arr[i] *= 2 3729 3730 def test_sharedctypes(self, lock=False): 3731 x = Value('i', 7, lock=lock) 3732 y = Value(c_double, 1.0/3.0, lock=lock) 3733 z = Value(c_longlong, 2 ** 33, lock=lock) 3734 foo = Value(_Foo, 3, 2, lock=lock) 3735 arr = self.Array('d', list(range(10)), lock=lock) 3736 string = self.Array('c', 20, lock=lock) 3737 string.value = latin('hello') 3738 3739 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3740 p.daemon = True 3741 p.start() 3742 p.join() 3743 3744 self.assertEqual(x.value, 14) 3745 self.assertAlmostEqual(y.value, 2.0/3.0) 3746 self.assertEqual(z.value, 2 ** 34) 3747 self.assertEqual(foo.x, 6) 3748 self.assertAlmostEqual(foo.y, 4.0) 3749 for i in range(10): 3750 self.assertAlmostEqual(arr[i], i*2) 3751 self.assertEqual(string.value, latin('hellohello')) 3752 3753 def test_synchronize(self): 3754 self.test_sharedctypes(lock=True) 3755 3756 def test_copy(self): 3757 foo = _Foo(2, 5.0, 2 ** 33) 3758 bar = copy(foo) 3759 foo.x = 0 3760 foo.y = 0 3761 foo.z = 0 3762 self.assertEqual(bar.x, 2) 3763 self.assertAlmostEqual(bar.y, 5.0) 3764 self.assertEqual(bar.z, 2 ** 33) 3765 3766 3767@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 3768@hashlib_helper.requires_hashdigest('md5') 3769class _TestSharedMemory(BaseTestCase): 3770 3771 ALLOWED_TYPES = ('processes',) 3772 3773 @staticmethod 3774 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 3775 if isinstance(shmem_name_or_obj, str): 3776 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 3777 else: 3778 local_sms = shmem_name_or_obj 3779 local_sms.buf[:len(binary_data)] = binary_data 3780 local_sms.close() 3781 3782 def _new_shm_name(self, prefix): 3783 # Add a PID to the name of a POSIX shared memory object to allow 3784 # running multiprocessing tests (test_multiprocessing_fork, 3785 # test_multiprocessing_spawn, etc) in parallel. 3786 return prefix + str(os.getpid()) 3787 3788 def test_shared_memory_basics(self): 3789 name_tsmb = self._new_shm_name('test01_tsmb') 3790 sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512) 3791 self.addCleanup(sms.unlink) 3792 3793 # Verify attributes are readable. 3794 self.assertEqual(sms.name, name_tsmb) 3795 self.assertGreaterEqual(sms.size, 512) 3796 self.assertGreaterEqual(len(sms.buf), sms.size) 3797 3798 # Verify __repr__ 3799 self.assertIn(sms.name, str(sms)) 3800 self.assertIn(str(sms.size), str(sms)) 3801 3802 # Modify contents of shared memory segment through memoryview. 3803 sms.buf[0] = 42 3804 self.assertEqual(sms.buf[0], 42) 3805 3806 # Attach to existing shared memory segment. 3807 also_sms = shared_memory.SharedMemory(name_tsmb) 3808 self.assertEqual(also_sms.buf[0], 42) 3809 also_sms.close() 3810 3811 # Attach to existing shared memory segment but specify a new size. 3812 same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size) 3813 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 3814 same_sms.close() 3815 3816 # Creating Shared Memory Segment with -ve size 3817 with self.assertRaises(ValueError): 3818 shared_memory.SharedMemory(create=True, size=-2) 3819 3820 # Attaching Shared Memory Segment without a name 3821 with self.assertRaises(ValueError): 3822 shared_memory.SharedMemory(create=False) 3823 3824 # Test if shared memory segment is created properly, 3825 # when _make_filename returns an existing shared memory segment name 3826 with unittest.mock.patch( 3827 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 3828 3829 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 3830 names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')] 3831 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 3832 # because some POSIX compliant systems require name to start with / 3833 names = [NAME_PREFIX + name for name in names] 3834 3835 mock_make_filename.side_effect = names 3836 shm1 = shared_memory.SharedMemory(create=True, size=1) 3837 self.addCleanup(shm1.unlink) 3838 self.assertEqual(shm1._name, names[0]) 3839 3840 mock_make_filename.side_effect = names 3841 shm2 = shared_memory.SharedMemory(create=True, size=1) 3842 self.addCleanup(shm2.unlink) 3843 self.assertEqual(shm2._name, names[1]) 3844 3845 if shared_memory._USE_POSIX: 3846 # Posix Shared Memory can only be unlinked once. Here we 3847 # test an implementation detail that is not observed across 3848 # all supported platforms (since WindowsNamedSharedMemory 3849 # manages unlinking on its own and unlink() does nothing). 3850 # True release of shared memory segment does not necessarily 3851 # happen until process exits, depending on the OS platform. 3852 name_dblunlink = self._new_shm_name('test01_dblunlink') 3853 sms_uno = shared_memory.SharedMemory( 3854 name_dblunlink, 3855 create=True, 3856 size=5000 3857 ) 3858 with self.assertRaises(FileNotFoundError): 3859 try: 3860 self.assertGreaterEqual(sms_uno.size, 5000) 3861 3862 sms_duo = shared_memory.SharedMemory(name_dblunlink) 3863 sms_duo.unlink() # First shm_unlink() call. 3864 sms_duo.close() 3865 sms_uno.close() 3866 3867 finally: 3868 sms_uno.unlink() # A second shm_unlink() call is bad. 3869 3870 with self.assertRaises(FileExistsError): 3871 # Attempting to create a new shared memory segment with a 3872 # name that is already in use triggers an exception. 3873 there_can_only_be_one_sms = shared_memory.SharedMemory( 3874 name_tsmb, 3875 create=True, 3876 size=512 3877 ) 3878 3879 if shared_memory._USE_POSIX: 3880 # Requesting creation of a shared memory segment with the option 3881 # to attach to an existing segment, if that name is currently in 3882 # use, should not trigger an exception. 3883 # Note: Using a smaller size could possibly cause truncation of 3884 # the existing segment but is OS platform dependent. In the 3885 # case of MacOS/darwin, requesting a smaller size is disallowed. 3886 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 3887 _flags = os.O_CREAT | os.O_RDWR 3888 ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb) 3889 self.assertEqual(ok_if_exists_sms.size, sms.size) 3890 ok_if_exists_sms.close() 3891 3892 # Attempting to attach to an existing shared memory segment when 3893 # no segment exists with the supplied name triggers an exception. 3894 with self.assertRaises(FileNotFoundError): 3895 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 3896 nonexisting_sms.unlink() # Error should occur on prior line. 3897 3898 sms.close() 3899 3900 def test_shared_memory_recreate(self): 3901 # Test if shared memory segment is created properly, 3902 # when _make_filename returns an existing shared memory segment name 3903 with unittest.mock.patch( 3904 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 3905 3906 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 3907 names = ['test01_fn', 'test02_fn'] 3908 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 3909 # because some POSIX compliant systems require name to start with / 3910 names = [NAME_PREFIX + name for name in names] 3911 3912 mock_make_filename.side_effect = names 3913 shm1 = shared_memory.SharedMemory(create=True, size=1) 3914 self.addCleanup(shm1.unlink) 3915 self.assertEqual(shm1._name, names[0]) 3916 3917 mock_make_filename.side_effect = names 3918 shm2 = shared_memory.SharedMemory(create=True, size=1) 3919 self.addCleanup(shm2.unlink) 3920 self.assertEqual(shm2._name, names[1]) 3921 3922 def test_invalid_shared_memory_cration(self): 3923 # Test creating a shared memory segment with negative size 3924 with self.assertRaises(ValueError): 3925 sms_invalid = shared_memory.SharedMemory(create=True, size=-1) 3926 3927 # Test creating a shared memory segment with size 0 3928 with self.assertRaises(ValueError): 3929 sms_invalid = shared_memory.SharedMemory(create=True, size=0) 3930 3931 # Test creating a shared memory segment without size argument 3932 with self.assertRaises(ValueError): 3933 sms_invalid = shared_memory.SharedMemory(create=True) 3934 3935 def test_shared_memory_pickle_unpickle(self): 3936 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 3937 with self.subTest(proto=proto): 3938 sms = shared_memory.SharedMemory(create=True, size=512) 3939 self.addCleanup(sms.unlink) 3940 sms.buf[0:6] = b'pickle' 3941 3942 # Test pickling 3943 pickled_sms = pickle.dumps(sms, protocol=proto) 3944 3945 # Test unpickling 3946 sms2 = pickle.loads(pickled_sms) 3947 self.assertIsInstance(sms2, shared_memory.SharedMemory) 3948 self.assertEqual(sms.name, sms2.name) 3949 self.assertEqual(bytes(sms.buf[0:6]), b'pickle') 3950 self.assertEqual(bytes(sms2.buf[0:6]), b'pickle') 3951 3952 # Test that unpickled version is still the same SharedMemory 3953 sms.buf[0:6] = b'newval' 3954 self.assertEqual(bytes(sms.buf[0:6]), b'newval') 3955 self.assertEqual(bytes(sms2.buf[0:6]), b'newval') 3956 3957 sms2.buf[0:6] = b'oldval' 3958 self.assertEqual(bytes(sms.buf[0:6]), b'oldval') 3959 self.assertEqual(bytes(sms2.buf[0:6]), b'oldval') 3960 3961 def test_shared_memory_pickle_unpickle_dead_object(self): 3962 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 3963 with self.subTest(proto=proto): 3964 sms = shared_memory.SharedMemory(create=True, size=512) 3965 sms.buf[0:6] = b'pickle' 3966 pickled_sms = pickle.dumps(sms, protocol=proto) 3967 3968 # Now, we are going to kill the original object. 3969 # So, unpickled one won't be able to attach to it. 3970 sms.close() 3971 sms.unlink() 3972 3973 with self.assertRaises(FileNotFoundError): 3974 pickle.loads(pickled_sms) 3975 3976 def test_shared_memory_across_processes(self): 3977 # bpo-40135: don't define shared memory block's name in case of 3978 # the failure when we run multiprocessing tests in parallel. 3979 sms = shared_memory.SharedMemory(create=True, size=512) 3980 self.addCleanup(sms.unlink) 3981 3982 # Verify remote attachment to existing block by name is working. 3983 p = self.Process( 3984 target=self._attach_existing_shmem_then_write, 3985 args=(sms.name, b'howdy') 3986 ) 3987 p.daemon = True 3988 p.start() 3989 p.join() 3990 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 3991 3992 # Verify pickling of SharedMemory instance also works. 3993 p = self.Process( 3994 target=self._attach_existing_shmem_then_write, 3995 args=(sms, b'HELLO') 3996 ) 3997 p.daemon = True 3998 p.start() 3999 p.join() 4000 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 4001 4002 sms.close() 4003 4004 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 4005 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 4006 # bpo-36368: protect SharedMemoryManager server process from 4007 # KeyboardInterrupt signals. 4008 smm = multiprocessing.managers.SharedMemoryManager() 4009 smm.start() 4010 4011 # make sure the manager works properly at the beginning 4012 sl = smm.ShareableList(range(10)) 4013 4014 # the manager's server should ignore KeyboardInterrupt signals, and 4015 # maintain its connection with the current process, and success when 4016 # asked to deliver memory segments. 4017 os.kill(smm._process.pid, signal.SIGINT) 4018 4019 sl2 = smm.ShareableList(range(10)) 4020 4021 # test that the custom signal handler registered in the Manager does 4022 # not affect signal handling in the parent process. 4023 with self.assertRaises(KeyboardInterrupt): 4024 os.kill(os.getpid(), signal.SIGINT) 4025 4026 smm.shutdown() 4027 4028 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 4029 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 4030 # bpo-36867: test that a SharedMemoryManager uses the 4031 # same resource_tracker process as its parent. 4032 cmd = '''if 1: 4033 from multiprocessing.managers import SharedMemoryManager 4034 4035 4036 smm = SharedMemoryManager() 4037 smm.start() 4038 sl = smm.ShareableList(range(10)) 4039 smm.shutdown() 4040 ''' 4041 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 4042 4043 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 4044 # resource_tracker process as its parent would make the parent's 4045 # tracker complain about sl being leaked even though smm.shutdown() 4046 # properly released sl. 4047 self.assertFalse(err) 4048 4049 def test_shared_memory_SharedMemoryManager_basics(self): 4050 smm1 = multiprocessing.managers.SharedMemoryManager() 4051 with self.assertRaises(ValueError): 4052 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 4053 smm1.start() 4054 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 4055 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 4056 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 4057 self.assertEqual(len(doppleganger_list0), 5) 4058 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 4059 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 4060 held_name = lom[0].name 4061 smm1.shutdown() 4062 if sys.platform != "win32": 4063 # Calls to unlink() have no effect on Windows platform; shared 4064 # memory will only be released once final process exits. 4065 with self.assertRaises(FileNotFoundError): 4066 # No longer there to be attached to again. 4067 absent_shm = shared_memory.SharedMemory(name=held_name) 4068 4069 with multiprocessing.managers.SharedMemoryManager() as smm2: 4070 sl = smm2.ShareableList("howdy") 4071 shm = smm2.SharedMemory(size=128) 4072 held_name = sl.shm.name 4073 if sys.platform != "win32": 4074 with self.assertRaises(FileNotFoundError): 4075 # No longer there to be attached to again. 4076 absent_sl = shared_memory.ShareableList(name=held_name) 4077 4078 4079 def test_shared_memory_ShareableList_basics(self): 4080 sl = shared_memory.ShareableList( 4081 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 4082 ) 4083 self.addCleanup(sl.shm.unlink) 4084 4085 # Verify __repr__ 4086 self.assertIn(sl.shm.name, str(sl)) 4087 self.assertIn(str(list(sl)), str(sl)) 4088 4089 # Index Out of Range (get) 4090 with self.assertRaises(IndexError): 4091 sl[7] 4092 4093 # Index Out of Range (set) 4094 with self.assertRaises(IndexError): 4095 sl[7] = 2 4096 4097 # Assign value without format change (str -> str) 4098 current_format = sl._get_packing_format(0) 4099 sl[0] = 'howdy' 4100 self.assertEqual(current_format, sl._get_packing_format(0)) 4101 4102 # Verify attributes are readable. 4103 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 4104 4105 # Exercise len(). 4106 self.assertEqual(len(sl), 7) 4107 4108 # Exercise index(). 4109 with warnings.catch_warnings(): 4110 # Suppress BytesWarning when comparing against b'HoWdY'. 4111 warnings.simplefilter('ignore') 4112 with self.assertRaises(ValueError): 4113 sl.index('100') 4114 self.assertEqual(sl.index(100), 3) 4115 4116 # Exercise retrieving individual values. 4117 self.assertEqual(sl[0], 'howdy') 4118 self.assertEqual(sl[-2], True) 4119 4120 # Exercise iterability. 4121 self.assertEqual( 4122 tuple(sl), 4123 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 4124 ) 4125 4126 # Exercise modifying individual values. 4127 sl[3] = 42 4128 self.assertEqual(sl[3], 42) 4129 sl[4] = 'some' # Change type at a given position. 4130 self.assertEqual(sl[4], 'some') 4131 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 4132 with self.assertRaisesRegex(ValueError, 4133 "exceeds available storage"): 4134 sl[4] = 'far too many' 4135 self.assertEqual(sl[4], 'some') 4136 sl[0] = 'encodés' # Exactly 8 bytes of UTF-8 data 4137 self.assertEqual(sl[0], 'encodés') 4138 self.assertEqual(sl[1], b'HoWdY') # no spillage 4139 with self.assertRaisesRegex(ValueError, 4140 "exceeds available storage"): 4141 sl[0] = 'encodées' # Exactly 9 bytes of UTF-8 data 4142 self.assertEqual(sl[1], b'HoWdY') 4143 with self.assertRaisesRegex(ValueError, 4144 "exceeds available storage"): 4145 sl[1] = b'123456789' 4146 self.assertEqual(sl[1], b'HoWdY') 4147 4148 # Exercise count(). 4149 with warnings.catch_warnings(): 4150 # Suppress BytesWarning when comparing against b'HoWdY'. 4151 warnings.simplefilter('ignore') 4152 self.assertEqual(sl.count(42), 2) 4153 self.assertEqual(sl.count(b'HoWdY'), 1) 4154 self.assertEqual(sl.count(b'adios'), 0) 4155 4156 # Exercise creating a duplicate. 4157 name_duplicate = self._new_shm_name('test03_duplicate') 4158 sl_copy = shared_memory.ShareableList(sl, name=name_duplicate) 4159 try: 4160 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 4161 self.assertEqual(name_duplicate, sl_copy.shm.name) 4162 self.assertEqual(list(sl), list(sl_copy)) 4163 self.assertEqual(sl.format, sl_copy.format) 4164 sl_copy[-1] = 77 4165 self.assertEqual(sl_copy[-1], 77) 4166 self.assertNotEqual(sl[-1], 77) 4167 sl_copy.shm.close() 4168 finally: 4169 sl_copy.shm.unlink() 4170 4171 # Obtain a second handle on the same ShareableList. 4172 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 4173 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 4174 sl_tethered[-1] = 880 4175 self.assertEqual(sl[-1], 880) 4176 sl_tethered.shm.close() 4177 4178 sl.shm.close() 4179 4180 # Exercise creating an empty ShareableList. 4181 empty_sl = shared_memory.ShareableList() 4182 try: 4183 self.assertEqual(len(empty_sl), 0) 4184 self.assertEqual(empty_sl.format, '') 4185 self.assertEqual(empty_sl.count('any'), 0) 4186 with self.assertRaises(ValueError): 4187 empty_sl.index(None) 4188 empty_sl.shm.close() 4189 finally: 4190 empty_sl.shm.unlink() 4191 4192 def test_shared_memory_ShareableList_pickling(self): 4193 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4194 with self.subTest(proto=proto): 4195 sl = shared_memory.ShareableList(range(10)) 4196 self.addCleanup(sl.shm.unlink) 4197 4198 serialized_sl = pickle.dumps(sl, protocol=proto) 4199 deserialized_sl = pickle.loads(serialized_sl) 4200 self.assertIsInstance( 4201 deserialized_sl, shared_memory.ShareableList) 4202 self.assertEqual(deserialized_sl[-1], 9) 4203 self.assertIsNot(sl, deserialized_sl) 4204 4205 deserialized_sl[4] = "changed" 4206 self.assertEqual(sl[4], "changed") 4207 sl[3] = "newvalue" 4208 self.assertEqual(deserialized_sl[3], "newvalue") 4209 4210 larger_sl = shared_memory.ShareableList(range(400)) 4211 self.addCleanup(larger_sl.shm.unlink) 4212 serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto) 4213 self.assertEqual(len(serialized_sl), len(serialized_larger_sl)) 4214 larger_sl.shm.close() 4215 4216 deserialized_sl.shm.close() 4217 sl.shm.close() 4218 4219 def test_shared_memory_ShareableList_pickling_dead_object(self): 4220 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4221 with self.subTest(proto=proto): 4222 sl = shared_memory.ShareableList(range(10)) 4223 serialized_sl = pickle.dumps(sl, protocol=proto) 4224 4225 # Now, we are going to kill the original object. 4226 # So, unpickled one won't be able to attach to it. 4227 sl.shm.close() 4228 sl.shm.unlink() 4229 4230 with self.assertRaises(FileNotFoundError): 4231 pickle.loads(serialized_sl) 4232 4233 def test_shared_memory_cleaned_after_process_termination(self): 4234 cmd = '''if 1: 4235 import os, time, sys 4236 from multiprocessing import shared_memory 4237 4238 # Create a shared_memory segment, and send the segment name 4239 sm = shared_memory.SharedMemory(create=True, size=10) 4240 sys.stdout.write(sm.name + '\\n') 4241 sys.stdout.flush() 4242 time.sleep(100) 4243 ''' 4244 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4245 stdout=subprocess.PIPE, 4246 stderr=subprocess.PIPE) as p: 4247 name = p.stdout.readline().strip().decode() 4248 4249 # killing abruptly processes holding reference to a shared memory 4250 # segment should not leak the given memory segment. 4251 p.terminate() 4252 p.wait() 4253 4254 deadline = time.monotonic() + support.LONG_TIMEOUT 4255 t = 0.1 4256 while time.monotonic() < deadline: 4257 time.sleep(t) 4258 t = min(t*2, 5) 4259 try: 4260 smm = shared_memory.SharedMemory(name, create=False) 4261 except FileNotFoundError: 4262 break 4263 else: 4264 raise AssertionError("A SharedMemory segment was leaked after" 4265 " a process was abruptly terminated.") 4266 4267 if os.name == 'posix': 4268 # Without this line it was raising warnings like: 4269 # UserWarning: resource_tracker: 4270 # There appear to be 1 leaked shared_memory 4271 # objects to clean up at shutdown 4272 # See: https://bugs.python.org/issue45209 4273 resource_tracker.unregister(f"/{name}", "shared_memory") 4274 4275 # A warning was emitted by the subprocess' own 4276 # resource_tracker (on Windows, shared memory segments 4277 # are released automatically by the OS). 4278 err = p.stderr.read().decode() 4279 self.assertIn( 4280 "resource_tracker: There appear to be 1 leaked " 4281 "shared_memory objects to clean up at shutdown", err) 4282 4283# 4284# Test to verify that `Finalize` works. 4285# 4286 4287class _TestFinalize(BaseTestCase): 4288 4289 ALLOWED_TYPES = ('processes',) 4290 4291 def setUp(self): 4292 self.registry_backup = util._finalizer_registry.copy() 4293 util._finalizer_registry.clear() 4294 4295 def tearDown(self): 4296 gc.collect() # For PyPy or other GCs. 4297 self.assertFalse(util._finalizer_registry) 4298 util._finalizer_registry.update(self.registry_backup) 4299 4300 @classmethod 4301 def _test_finalize(cls, conn): 4302 class Foo(object): 4303 pass 4304 4305 a = Foo() 4306 util.Finalize(a, conn.send, args=('a',)) 4307 del a # triggers callback for a 4308 gc.collect() # For PyPy or other GCs. 4309 4310 b = Foo() 4311 close_b = util.Finalize(b, conn.send, args=('b',)) 4312 close_b() # triggers callback for b 4313 close_b() # does nothing because callback has already been called 4314 del b # does nothing because callback has already been called 4315 gc.collect() # For PyPy or other GCs. 4316 4317 c = Foo() 4318 util.Finalize(c, conn.send, args=('c',)) 4319 4320 d10 = Foo() 4321 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4322 4323 d01 = Foo() 4324 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4325 d02 = Foo() 4326 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4327 d03 = Foo() 4328 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4329 4330 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4331 4332 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4333 4334 # call multiprocessing's cleanup function then exit process without 4335 # garbage collecting locals 4336 util._exit_function() 4337 conn.close() 4338 os._exit(0) 4339 4340 def test_finalize(self): 4341 conn, child_conn = self.Pipe() 4342 4343 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4344 p.daemon = True 4345 p.start() 4346 p.join() 4347 4348 result = [obj for obj in iter(conn.recv, 'STOP')] 4349 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4350 4351 def test_thread_safety(self): 4352 # bpo-24484: _run_finalizers() should be thread-safe 4353 def cb(): 4354 pass 4355 4356 class Foo(object): 4357 def __init__(self): 4358 self.ref = self # create reference cycle 4359 # insert finalizer at random key 4360 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4361 4362 finish = False 4363 exc = None 4364 4365 def run_finalizers(): 4366 nonlocal exc 4367 while not finish: 4368 time.sleep(random.random() * 1e-1) 4369 try: 4370 # A GC run will eventually happen during this, 4371 # collecting stale Foo's and mutating the registry 4372 util._run_finalizers() 4373 except Exception as e: 4374 exc = e 4375 4376 def make_finalizers(): 4377 nonlocal exc 4378 d = {} 4379 while not finish: 4380 try: 4381 # Old Foo's get gradually replaced and later 4382 # collected by the GC (because of the cyclic ref) 4383 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4384 except Exception as e: 4385 exc = e 4386 d.clear() 4387 4388 old_interval = sys.getswitchinterval() 4389 old_threshold = gc.get_threshold() 4390 try: 4391 sys.setswitchinterval(1e-6) 4392 gc.set_threshold(5, 5, 5) 4393 threads = [threading.Thread(target=run_finalizers), 4394 threading.Thread(target=make_finalizers)] 4395 with threading_helper.start_threads(threads): 4396 time.sleep(4.0) # Wait a bit to trigger race condition 4397 finish = True 4398 if exc is not None: 4399 raise exc 4400 finally: 4401 sys.setswitchinterval(old_interval) 4402 gc.set_threshold(*old_threshold) 4403 gc.collect() # Collect remaining Foo's 4404 4405 4406# 4407# Test that from ... import * works for each module 4408# 4409 4410class _TestImportStar(unittest.TestCase): 4411 4412 def get_module_names(self): 4413 import glob 4414 folder = os.path.dirname(multiprocessing.__file__) 4415 pattern = os.path.join(glob.escape(folder), '*.py') 4416 files = glob.glob(pattern) 4417 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4418 modules = ['multiprocessing.' + m for m in modules] 4419 modules.remove('multiprocessing.__init__') 4420 modules.append('multiprocessing') 4421 return modules 4422 4423 def test_import(self): 4424 modules = self.get_module_names() 4425 if sys.platform == 'win32': 4426 modules.remove('multiprocessing.popen_fork') 4427 modules.remove('multiprocessing.popen_forkserver') 4428 modules.remove('multiprocessing.popen_spawn_posix') 4429 else: 4430 modules.remove('multiprocessing.popen_spawn_win32') 4431 if not HAS_REDUCTION: 4432 modules.remove('multiprocessing.popen_forkserver') 4433 4434 if c_int is None: 4435 # This module requires _ctypes 4436 modules.remove('multiprocessing.sharedctypes') 4437 4438 for name in modules: 4439 __import__(name) 4440 mod = sys.modules[name] 4441 self.assertTrue(hasattr(mod, '__all__'), name) 4442 4443 for attr in mod.__all__: 4444 self.assertTrue( 4445 hasattr(mod, attr), 4446 '%r does not have attribute %r' % (mod, attr) 4447 ) 4448 4449# 4450# Quick test that logging works -- does not test logging output 4451# 4452 4453class _TestLogging(BaseTestCase): 4454 4455 ALLOWED_TYPES = ('processes',) 4456 4457 def test_enable_logging(self): 4458 logger = multiprocessing.get_logger() 4459 logger.setLevel(util.SUBWARNING) 4460 self.assertTrue(logger is not None) 4461 logger.debug('this will not be printed') 4462 logger.info('nor will this') 4463 logger.setLevel(LOG_LEVEL) 4464 4465 @classmethod 4466 def _test_level(cls, conn): 4467 logger = multiprocessing.get_logger() 4468 conn.send(logger.getEffectiveLevel()) 4469 4470 def test_level(self): 4471 LEVEL1 = 32 4472 LEVEL2 = 37 4473 4474 logger = multiprocessing.get_logger() 4475 root_logger = logging.getLogger() 4476 root_level = root_logger.level 4477 4478 reader, writer = multiprocessing.Pipe(duplex=False) 4479 4480 logger.setLevel(LEVEL1) 4481 p = self.Process(target=self._test_level, args=(writer,)) 4482 p.start() 4483 self.assertEqual(LEVEL1, reader.recv()) 4484 p.join() 4485 p.close() 4486 4487 logger.setLevel(logging.NOTSET) 4488 root_logger.setLevel(LEVEL2) 4489 p = self.Process(target=self._test_level, args=(writer,)) 4490 p.start() 4491 self.assertEqual(LEVEL2, reader.recv()) 4492 p.join() 4493 p.close() 4494 4495 root_logger.setLevel(root_level) 4496 logger.setLevel(level=LOG_LEVEL) 4497 4498 4499# class _TestLoggingProcessName(BaseTestCase): 4500# 4501# def handle(self, record): 4502# assert record.processName == multiprocessing.current_process().name 4503# self.__handled = True 4504# 4505# def test_logging(self): 4506# handler = logging.Handler() 4507# handler.handle = self.handle 4508# self.__handled = False 4509# # Bypass getLogger() and side-effects 4510# logger = logging.getLoggerClass()( 4511# 'multiprocessing.test.TestLoggingProcessName') 4512# logger.addHandler(handler) 4513# logger.propagate = False 4514# 4515# logger.warn('foo') 4516# assert self.__handled 4517 4518# 4519# Check that Process.join() retries if os.waitpid() fails with EINTR 4520# 4521 4522class _TestPollEintr(BaseTestCase): 4523 4524 ALLOWED_TYPES = ('processes',) 4525 4526 @classmethod 4527 def _killer(cls, pid): 4528 time.sleep(0.1) 4529 os.kill(pid, signal.SIGUSR1) 4530 4531 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4532 def test_poll_eintr(self): 4533 got_signal = [False] 4534 def record(*args): 4535 got_signal[0] = True 4536 pid = os.getpid() 4537 oldhandler = signal.signal(signal.SIGUSR1, record) 4538 try: 4539 killer = self.Process(target=self._killer, args=(pid,)) 4540 killer.start() 4541 try: 4542 p = self.Process(target=time.sleep, args=(2,)) 4543 p.start() 4544 p.join() 4545 finally: 4546 killer.join() 4547 self.assertTrue(got_signal[0]) 4548 self.assertEqual(p.exitcode, 0) 4549 finally: 4550 signal.signal(signal.SIGUSR1, oldhandler) 4551 4552# 4553# Test to verify handle verification, see issue 3321 4554# 4555 4556class TestInvalidHandle(unittest.TestCase): 4557 4558 @unittest.skipIf(WIN32, "skipped on Windows") 4559 def test_invalid_handles(self): 4560 conn = multiprocessing.connection.Connection(44977608) 4561 # check that poll() doesn't crash 4562 try: 4563 conn.poll() 4564 except (ValueError, OSError): 4565 pass 4566 finally: 4567 # Hack private attribute _handle to avoid printing an error 4568 # in conn.__del__ 4569 conn._handle = None 4570 self.assertRaises((ValueError, OSError), 4571 multiprocessing.connection.Connection, -1) 4572 4573 4574 4575@hashlib_helper.requires_hashdigest('md5') 4576class OtherTest(unittest.TestCase): 4577 # TODO: add more tests for deliver/answer challenge. 4578 def test_deliver_challenge_auth_failure(self): 4579 class _FakeConnection(object): 4580 def recv_bytes(self, size): 4581 return b'something bogus' 4582 def send_bytes(self, data): 4583 pass 4584 self.assertRaises(multiprocessing.AuthenticationError, 4585 multiprocessing.connection.deliver_challenge, 4586 _FakeConnection(), b'abc') 4587 4588 def test_answer_challenge_auth_failure(self): 4589 class _FakeConnection(object): 4590 def __init__(self): 4591 self.count = 0 4592 def recv_bytes(self, size): 4593 self.count += 1 4594 if self.count == 1: 4595 return multiprocessing.connection.CHALLENGE 4596 elif self.count == 2: 4597 return b'something bogus' 4598 return b'' 4599 def send_bytes(self, data): 4600 pass 4601 self.assertRaises(multiprocessing.AuthenticationError, 4602 multiprocessing.connection.answer_challenge, 4603 _FakeConnection(), b'abc') 4604 4605# 4606# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 4607# 4608 4609def initializer(ns): 4610 ns.test += 1 4611 4612@hashlib_helper.requires_hashdigest('md5') 4613class TestInitializers(unittest.TestCase): 4614 def setUp(self): 4615 self.mgr = multiprocessing.Manager() 4616 self.ns = self.mgr.Namespace() 4617 self.ns.test = 0 4618 4619 def tearDown(self): 4620 self.mgr.shutdown() 4621 self.mgr.join() 4622 4623 def test_manager_initializer(self): 4624 m = multiprocessing.managers.SyncManager() 4625 self.assertRaises(TypeError, m.start, 1) 4626 m.start(initializer, (self.ns,)) 4627 self.assertEqual(self.ns.test, 1) 4628 m.shutdown() 4629 m.join() 4630 4631 def test_pool_initializer(self): 4632 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 4633 p = multiprocessing.Pool(1, initializer, (self.ns,)) 4634 p.close() 4635 p.join() 4636 self.assertEqual(self.ns.test, 1) 4637 4638# 4639# Issue 5155, 5313, 5331: Test process in processes 4640# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 4641# 4642 4643def _this_sub_process(q): 4644 try: 4645 item = q.get(block=False) 4646 except pyqueue.Empty: 4647 pass 4648 4649def _test_process(): 4650 queue = multiprocessing.Queue() 4651 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 4652 subProc.daemon = True 4653 subProc.start() 4654 subProc.join() 4655 4656def _afunc(x): 4657 return x*x 4658 4659def pool_in_process(): 4660 pool = multiprocessing.Pool(processes=4) 4661 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 4662 pool.close() 4663 pool.join() 4664 4665class _file_like(object): 4666 def __init__(self, delegate): 4667 self._delegate = delegate 4668 self._pid = None 4669 4670 @property 4671 def cache(self): 4672 pid = os.getpid() 4673 # There are no race conditions since fork keeps only the running thread 4674 if pid != self._pid: 4675 self._pid = pid 4676 self._cache = [] 4677 return self._cache 4678 4679 def write(self, data): 4680 self.cache.append(data) 4681 4682 def flush(self): 4683 self._delegate.write(''.join(self.cache)) 4684 self._cache = [] 4685 4686class TestStdinBadfiledescriptor(unittest.TestCase): 4687 4688 def test_queue_in_process(self): 4689 proc = multiprocessing.Process(target=_test_process) 4690 proc.start() 4691 proc.join() 4692 4693 def test_pool_in_process(self): 4694 p = multiprocessing.Process(target=pool_in_process) 4695 p.start() 4696 p.join() 4697 4698 def test_flushing(self): 4699 sio = io.StringIO() 4700 flike = _file_like(sio) 4701 flike.write('foo') 4702 proc = multiprocessing.Process(target=lambda: flike.flush()) 4703 flike.flush() 4704 assert sio.getvalue() == 'foo' 4705 4706 4707class TestWait(unittest.TestCase): 4708 4709 @classmethod 4710 def _child_test_wait(cls, w, slow): 4711 for i in range(10): 4712 if slow: 4713 time.sleep(random.random()*0.1) 4714 w.send((i, os.getpid())) 4715 w.close() 4716 4717 def test_wait(self, slow=False): 4718 from multiprocessing.connection import wait 4719 readers = [] 4720 procs = [] 4721 messages = [] 4722 4723 for i in range(4): 4724 r, w = multiprocessing.Pipe(duplex=False) 4725 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 4726 p.daemon = True 4727 p.start() 4728 w.close() 4729 readers.append(r) 4730 procs.append(p) 4731 self.addCleanup(p.join) 4732 4733 while readers: 4734 for r in wait(readers): 4735 try: 4736 msg = r.recv() 4737 except EOFError: 4738 readers.remove(r) 4739 r.close() 4740 else: 4741 messages.append(msg) 4742 4743 messages.sort() 4744 expected = sorted((i, p.pid) for i in range(10) for p in procs) 4745 self.assertEqual(messages, expected) 4746 4747 @classmethod 4748 def _child_test_wait_socket(cls, address, slow): 4749 s = socket.socket() 4750 s.connect(address) 4751 for i in range(10): 4752 if slow: 4753 time.sleep(random.random()*0.1) 4754 s.sendall(('%s\n' % i).encode('ascii')) 4755 s.close() 4756 4757 def test_wait_socket(self, slow=False): 4758 from multiprocessing.connection import wait 4759 l = socket.create_server((socket_helper.HOST, 0)) 4760 addr = l.getsockname() 4761 readers = [] 4762 procs = [] 4763 dic = {} 4764 4765 for i in range(4): 4766 p = multiprocessing.Process(target=self._child_test_wait_socket, 4767 args=(addr, slow)) 4768 p.daemon = True 4769 p.start() 4770 procs.append(p) 4771 self.addCleanup(p.join) 4772 4773 for i in range(4): 4774 r, _ = l.accept() 4775 readers.append(r) 4776 dic[r] = [] 4777 l.close() 4778 4779 while readers: 4780 for r in wait(readers): 4781 msg = r.recv(32) 4782 if not msg: 4783 readers.remove(r) 4784 r.close() 4785 else: 4786 dic[r].append(msg) 4787 4788 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4789 for v in dic.values(): 4790 self.assertEqual(b''.join(v), expected) 4791 4792 def test_wait_slow(self): 4793 self.test_wait(True) 4794 4795 def test_wait_socket_slow(self): 4796 self.test_wait_socket(True) 4797 4798 def test_wait_timeout(self): 4799 from multiprocessing.connection import wait 4800 4801 expected = 5 4802 a, b = multiprocessing.Pipe() 4803 4804 start = time.monotonic() 4805 res = wait([a, b], expected) 4806 delta = time.monotonic() - start 4807 4808 self.assertEqual(res, []) 4809 self.assertLess(delta, expected * 2) 4810 self.assertGreater(delta, expected * 0.5) 4811 4812 b.send(None) 4813 4814 start = time.monotonic() 4815 res = wait([a, b], 20) 4816 delta = time.monotonic() - start 4817 4818 self.assertEqual(res, [a]) 4819 self.assertLess(delta, 0.4) 4820 4821 @classmethod 4822 def signal_and_sleep(cls, sem, period): 4823 sem.release() 4824 time.sleep(period) 4825 4826 def test_wait_integer(self): 4827 from multiprocessing.connection import wait 4828 4829 expected = 3 4830 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4831 sem = multiprocessing.Semaphore(0) 4832 a, b = multiprocessing.Pipe() 4833 p = multiprocessing.Process(target=self.signal_and_sleep, 4834 args=(sem, expected)) 4835 4836 p.start() 4837 self.assertIsInstance(p.sentinel, int) 4838 self.assertTrue(sem.acquire(timeout=20)) 4839 4840 start = time.monotonic() 4841 res = wait([a, p.sentinel, b], expected + 20) 4842 delta = time.monotonic() - start 4843 4844 self.assertEqual(res, [p.sentinel]) 4845 self.assertLess(delta, expected + 2) 4846 self.assertGreater(delta, expected - 2) 4847 4848 a.send(None) 4849 4850 start = time.monotonic() 4851 res = wait([a, p.sentinel, b], 20) 4852 delta = time.monotonic() - start 4853 4854 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4855 self.assertLess(delta, 0.4) 4856 4857 b.send(None) 4858 4859 start = time.monotonic() 4860 res = wait([a, p.sentinel, b], 20) 4861 delta = time.monotonic() - start 4862 4863 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4864 self.assertLess(delta, 0.4) 4865 4866 p.terminate() 4867 p.join() 4868 4869 def test_neg_timeout(self): 4870 from multiprocessing.connection import wait 4871 a, b = multiprocessing.Pipe() 4872 t = time.monotonic() 4873 res = wait([a], timeout=-1) 4874 t = time.monotonic() - t 4875 self.assertEqual(res, []) 4876 self.assertLess(t, 1) 4877 a.close() 4878 b.close() 4879 4880# 4881# Issue 14151: Test invalid family on invalid environment 4882# 4883 4884class TestInvalidFamily(unittest.TestCase): 4885 4886 @unittest.skipIf(WIN32, "skipped on Windows") 4887 def test_invalid_family(self): 4888 with self.assertRaises(ValueError): 4889 multiprocessing.connection.Listener(r'\\.\test') 4890 4891 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4892 def test_invalid_family_win32(self): 4893 with self.assertRaises(ValueError): 4894 multiprocessing.connection.Listener('/var/test.pipe') 4895 4896# 4897# Issue 12098: check sys.flags of child matches that for parent 4898# 4899 4900class TestFlags(unittest.TestCase): 4901 @classmethod 4902 def run_in_grandchild(cls, conn): 4903 conn.send(tuple(sys.flags)) 4904 4905 @classmethod 4906 def run_in_child(cls): 4907 import json 4908 r, w = multiprocessing.Pipe(duplex=False) 4909 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4910 p.start() 4911 grandchild_flags = r.recv() 4912 p.join() 4913 r.close() 4914 w.close() 4915 flags = (tuple(sys.flags), grandchild_flags) 4916 print(json.dumps(flags)) 4917 4918 def test_flags(self): 4919 import json 4920 # start child process using unusual flags 4921 prog = ('from test._test_multiprocessing import TestFlags; ' + 4922 'TestFlags.run_in_child()') 4923 data = subprocess.check_output( 4924 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4925 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4926 self.assertEqual(child_flags, grandchild_flags) 4927 4928# 4929# Test interaction with socket timeouts - see Issue #6056 4930# 4931 4932class TestTimeouts(unittest.TestCase): 4933 @classmethod 4934 def _test_timeout(cls, child, address): 4935 time.sleep(1) 4936 child.send(123) 4937 child.close() 4938 conn = multiprocessing.connection.Client(address) 4939 conn.send(456) 4940 conn.close() 4941 4942 def test_timeout(self): 4943 old_timeout = socket.getdefaulttimeout() 4944 try: 4945 socket.setdefaulttimeout(0.1) 4946 parent, child = multiprocessing.Pipe(duplex=True) 4947 l = multiprocessing.connection.Listener(family='AF_INET') 4948 p = multiprocessing.Process(target=self._test_timeout, 4949 args=(child, l.address)) 4950 p.start() 4951 child.close() 4952 self.assertEqual(parent.recv(), 123) 4953 parent.close() 4954 conn = l.accept() 4955 self.assertEqual(conn.recv(), 456) 4956 conn.close() 4957 l.close() 4958 join_process(p) 4959 finally: 4960 socket.setdefaulttimeout(old_timeout) 4961 4962# 4963# Test what happens with no "if __name__ == '__main__'" 4964# 4965 4966class TestNoForkBomb(unittest.TestCase): 4967 def test_noforkbomb(self): 4968 sm = multiprocessing.get_start_method() 4969 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 4970 if sm != 'fork': 4971 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 4972 self.assertEqual(out, b'') 4973 self.assertIn(b'RuntimeError', err) 4974 else: 4975 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 4976 self.assertEqual(out.rstrip(), b'123') 4977 self.assertEqual(err, b'') 4978 4979# 4980# Issue #17555: ForkAwareThreadLock 4981# 4982 4983class TestForkAwareThreadLock(unittest.TestCase): 4984 # We recursively start processes. Issue #17555 meant that the 4985 # after fork registry would get duplicate entries for the same 4986 # lock. The size of the registry at generation n was ~2**n. 4987 4988 @classmethod 4989 def child(cls, n, conn): 4990 if n > 1: 4991 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 4992 p.start() 4993 conn.close() 4994 join_process(p) 4995 else: 4996 conn.send(len(util._afterfork_registry)) 4997 conn.close() 4998 4999 def test_lock(self): 5000 r, w = multiprocessing.Pipe(False) 5001 l = util.ForkAwareThreadLock() 5002 old_size = len(util._afterfork_registry) 5003 p = multiprocessing.Process(target=self.child, args=(5, w)) 5004 p.start() 5005 w.close() 5006 new_size = r.recv() 5007 join_process(p) 5008 self.assertLessEqual(new_size, old_size) 5009 5010# 5011# Check that non-forked child processes do not inherit unneeded fds/handles 5012# 5013 5014class TestCloseFds(unittest.TestCase): 5015 5016 def get_high_socket_fd(self): 5017 if WIN32: 5018 # The child process will not have any socket handles, so 5019 # calling socket.fromfd() should produce WSAENOTSOCK even 5020 # if there is a handle of the same number. 5021 return socket.socket().detach() 5022 else: 5023 # We want to produce a socket with an fd high enough that a 5024 # freshly created child process will not have any fds as high. 5025 fd = socket.socket().detach() 5026 to_close = [] 5027 while fd < 50: 5028 to_close.append(fd) 5029 fd = os.dup(fd) 5030 for x in to_close: 5031 os.close(x) 5032 return fd 5033 5034 def close(self, fd): 5035 if WIN32: 5036 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 5037 else: 5038 os.close(fd) 5039 5040 @classmethod 5041 def _test_closefds(cls, conn, fd): 5042 try: 5043 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 5044 except Exception as e: 5045 conn.send(e) 5046 else: 5047 s.close() 5048 conn.send(None) 5049 5050 def test_closefd(self): 5051 if not HAS_REDUCTION: 5052 raise unittest.SkipTest('requires fd pickling') 5053 5054 reader, writer = multiprocessing.Pipe() 5055 fd = self.get_high_socket_fd() 5056 try: 5057 p = multiprocessing.Process(target=self._test_closefds, 5058 args=(writer, fd)) 5059 p.start() 5060 writer.close() 5061 e = reader.recv() 5062 join_process(p) 5063 finally: 5064 self.close(fd) 5065 writer.close() 5066 reader.close() 5067 5068 if multiprocessing.get_start_method() == 'fork': 5069 self.assertIs(e, None) 5070 else: 5071 WSAENOTSOCK = 10038 5072 self.assertIsInstance(e, OSError) 5073 self.assertTrue(e.errno == errno.EBADF or 5074 e.winerror == WSAENOTSOCK, e) 5075 5076# 5077# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 5078# 5079 5080class TestIgnoreEINTR(unittest.TestCase): 5081 5082 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 5083 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 5084 5085 @classmethod 5086 def _test_ignore(cls, conn): 5087 def handler(signum, frame): 5088 pass 5089 signal.signal(signal.SIGUSR1, handler) 5090 conn.send('ready') 5091 x = conn.recv() 5092 conn.send(x) 5093 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 5094 5095 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5096 def test_ignore(self): 5097 conn, child_conn = multiprocessing.Pipe() 5098 try: 5099 p = multiprocessing.Process(target=self._test_ignore, 5100 args=(child_conn,)) 5101 p.daemon = True 5102 p.start() 5103 child_conn.close() 5104 self.assertEqual(conn.recv(), 'ready') 5105 time.sleep(0.1) 5106 os.kill(p.pid, signal.SIGUSR1) 5107 time.sleep(0.1) 5108 conn.send(1234) 5109 self.assertEqual(conn.recv(), 1234) 5110 time.sleep(0.1) 5111 os.kill(p.pid, signal.SIGUSR1) 5112 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 5113 time.sleep(0.1) 5114 p.join() 5115 finally: 5116 conn.close() 5117 5118 @classmethod 5119 def _test_ignore_listener(cls, conn): 5120 def handler(signum, frame): 5121 pass 5122 signal.signal(signal.SIGUSR1, handler) 5123 with multiprocessing.connection.Listener() as l: 5124 conn.send(l.address) 5125 a = l.accept() 5126 a.send('welcome') 5127 5128 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5129 def test_ignore_listener(self): 5130 conn, child_conn = multiprocessing.Pipe() 5131 try: 5132 p = multiprocessing.Process(target=self._test_ignore_listener, 5133 args=(child_conn,)) 5134 p.daemon = True 5135 p.start() 5136 child_conn.close() 5137 address = conn.recv() 5138 time.sleep(0.1) 5139 os.kill(p.pid, signal.SIGUSR1) 5140 time.sleep(0.1) 5141 client = multiprocessing.connection.Client(address) 5142 self.assertEqual(client.recv(), 'welcome') 5143 p.join() 5144 finally: 5145 conn.close() 5146 5147class TestStartMethod(unittest.TestCase): 5148 @classmethod 5149 def _check_context(cls, conn): 5150 conn.send(multiprocessing.get_start_method()) 5151 5152 def check_context(self, ctx): 5153 r, w = ctx.Pipe(duplex=False) 5154 p = ctx.Process(target=self._check_context, args=(w,)) 5155 p.start() 5156 w.close() 5157 child_method = r.recv() 5158 r.close() 5159 p.join() 5160 self.assertEqual(child_method, ctx.get_start_method()) 5161 5162 def test_context(self): 5163 for method in ('fork', 'spawn', 'forkserver'): 5164 try: 5165 ctx = multiprocessing.get_context(method) 5166 except ValueError: 5167 continue 5168 self.assertEqual(ctx.get_start_method(), method) 5169 self.assertIs(ctx.get_context(), ctx) 5170 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 5171 self.assertRaises(ValueError, ctx.set_start_method, None) 5172 self.check_context(ctx) 5173 5174 def test_set_get(self): 5175 multiprocessing.set_forkserver_preload(PRELOAD) 5176 count = 0 5177 old_method = multiprocessing.get_start_method() 5178 try: 5179 for method in ('fork', 'spawn', 'forkserver'): 5180 try: 5181 multiprocessing.set_start_method(method, force=True) 5182 except ValueError: 5183 continue 5184 self.assertEqual(multiprocessing.get_start_method(), method) 5185 ctx = multiprocessing.get_context() 5186 self.assertEqual(ctx.get_start_method(), method) 5187 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 5188 self.assertTrue( 5189 ctx.Process.__name__.lower().startswith(method)) 5190 self.check_context(multiprocessing) 5191 count += 1 5192 finally: 5193 multiprocessing.set_start_method(old_method, force=True) 5194 self.assertGreaterEqual(count, 1) 5195 5196 def test_get_all(self): 5197 methods = multiprocessing.get_all_start_methods() 5198 if sys.platform == 'win32': 5199 self.assertEqual(methods, ['spawn']) 5200 else: 5201 self.assertTrue(methods == ['fork', 'spawn'] or 5202 methods == ['spawn', 'fork'] or 5203 methods == ['fork', 'spawn', 'forkserver'] or 5204 methods == ['spawn', 'fork', 'forkserver']) 5205 5206 def test_preload_resources(self): 5207 if multiprocessing.get_start_method() != 'forkserver': 5208 self.skipTest("test only relevant for 'forkserver' method") 5209 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 5210 rc, out, err = test.support.script_helper.assert_python_ok(name) 5211 out = out.decode() 5212 err = err.decode() 5213 if out.rstrip() != 'ok' or err != '': 5214 print(out) 5215 print(err) 5216 self.fail("failed spawning forkserver or grandchild") 5217 5218 5219@unittest.skipIf(sys.platform == "win32", 5220 "test semantics don't make sense on Windows") 5221class TestResourceTracker(unittest.TestCase): 5222 5223 def test_resource_tracker(self): 5224 # 5225 # Check that killing process does not leak named semaphores 5226 # 5227 cmd = '''if 1: 5228 import time, os, tempfile 5229 import multiprocessing as mp 5230 from multiprocessing import resource_tracker 5231 from multiprocessing.shared_memory import SharedMemory 5232 5233 mp.set_start_method("spawn") 5234 rand = tempfile._RandomNameSequence() 5235 5236 5237 def create_and_register_resource(rtype): 5238 if rtype == "semaphore": 5239 lock = mp.Lock() 5240 return lock, lock._semlock.name 5241 elif rtype == "shared_memory": 5242 sm = SharedMemory(create=True, size=10) 5243 return sm, sm._name 5244 else: 5245 raise ValueError( 5246 "Resource type {{}} not understood".format(rtype)) 5247 5248 5249 resource1, rname1 = create_and_register_resource("{rtype}") 5250 resource2, rname2 = create_and_register_resource("{rtype}") 5251 5252 os.write({w}, rname1.encode("ascii") + b"\\n") 5253 os.write({w}, rname2.encode("ascii") + b"\\n") 5254 5255 time.sleep(10) 5256 ''' 5257 for rtype in resource_tracker._CLEANUP_FUNCS: 5258 with self.subTest(rtype=rtype): 5259 if rtype == "noop": 5260 # Artefact resource type used by the resource_tracker 5261 continue 5262 r, w = os.pipe() 5263 p = subprocess.Popen([sys.executable, 5264 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5265 pass_fds=[w], 5266 stderr=subprocess.PIPE) 5267 os.close(w) 5268 with open(r, 'rb', closefd=True) as f: 5269 name1 = f.readline().rstrip().decode('ascii') 5270 name2 = f.readline().rstrip().decode('ascii') 5271 _resource_unlink(name1, rtype) 5272 p.terminate() 5273 p.wait() 5274 5275 deadline = time.monotonic() + support.LONG_TIMEOUT 5276 while time.monotonic() < deadline: 5277 time.sleep(.5) 5278 try: 5279 _resource_unlink(name2, rtype) 5280 except OSError as e: 5281 # docs say it should be ENOENT, but OSX seems to give 5282 # EINVAL 5283 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5284 break 5285 else: 5286 raise AssertionError( 5287 f"A {rtype} resource was leaked after a process was " 5288 f"abruptly terminated.") 5289 err = p.stderr.read().decode('utf-8') 5290 p.stderr.close() 5291 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5292 'objects'.format( 5293 rtype)) 5294 self.assertRegex(err, expected) 5295 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5296 5297 def check_resource_tracker_death(self, signum, should_die): 5298 # bpo-31310: if the semaphore tracker process has died, it should 5299 # be restarted implicitly. 5300 from multiprocessing.resource_tracker import _resource_tracker 5301 pid = _resource_tracker._pid 5302 if pid is not None: 5303 os.kill(pid, signal.SIGKILL) 5304 support.wait_process(pid, exitcode=-signal.SIGKILL) 5305 with warnings.catch_warnings(): 5306 warnings.simplefilter("ignore") 5307 _resource_tracker.ensure_running() 5308 pid = _resource_tracker._pid 5309 5310 os.kill(pid, signum) 5311 time.sleep(1.0) # give it time to die 5312 5313 ctx = multiprocessing.get_context("spawn") 5314 with warnings.catch_warnings(record=True) as all_warn: 5315 warnings.simplefilter("always") 5316 sem = ctx.Semaphore() 5317 sem.acquire() 5318 sem.release() 5319 wr = weakref.ref(sem) 5320 # ensure `sem` gets collected, which triggers communication with 5321 # the semaphore tracker 5322 del sem 5323 gc.collect() 5324 self.assertIsNone(wr()) 5325 if should_die: 5326 self.assertEqual(len(all_warn), 1) 5327 the_warn = all_warn[0] 5328 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5329 self.assertTrue("resource_tracker: process died" 5330 in str(the_warn.message)) 5331 else: 5332 self.assertEqual(len(all_warn), 0) 5333 5334 def test_resource_tracker_sigint(self): 5335 # Catchable signal (ignored by semaphore tracker) 5336 self.check_resource_tracker_death(signal.SIGINT, False) 5337 5338 def test_resource_tracker_sigterm(self): 5339 # Catchable signal (ignored by semaphore tracker) 5340 self.check_resource_tracker_death(signal.SIGTERM, False) 5341 5342 def test_resource_tracker_sigkill(self): 5343 # Uncatchable signal. 5344 self.check_resource_tracker_death(signal.SIGKILL, True) 5345 5346 @staticmethod 5347 def _is_resource_tracker_reused(conn, pid): 5348 from multiprocessing.resource_tracker import _resource_tracker 5349 _resource_tracker.ensure_running() 5350 # The pid should be None in the child process, expect for the fork 5351 # context. It should not be a new value. 5352 reused = _resource_tracker._pid in (None, pid) 5353 reused &= _resource_tracker._check_alive() 5354 conn.send(reused) 5355 5356 def test_resource_tracker_reused(self): 5357 from multiprocessing.resource_tracker import _resource_tracker 5358 _resource_tracker.ensure_running() 5359 pid = _resource_tracker._pid 5360 5361 r, w = multiprocessing.Pipe(duplex=False) 5362 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5363 args=(w, pid)) 5364 p.start() 5365 is_resource_tracker_reused = r.recv() 5366 5367 # Clean up 5368 p.join() 5369 w.close() 5370 r.close() 5371 5372 self.assertTrue(is_resource_tracker_reused) 5373 5374 5375class TestSimpleQueue(unittest.TestCase): 5376 5377 @classmethod 5378 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5379 child_can_start.wait() 5380 # issue 30301, could fail under spawn and forkserver 5381 try: 5382 queue.put(queue.empty()) 5383 queue.put(queue.empty()) 5384 finally: 5385 parent_can_continue.set() 5386 5387 def test_empty(self): 5388 queue = multiprocessing.SimpleQueue() 5389 child_can_start = multiprocessing.Event() 5390 parent_can_continue = multiprocessing.Event() 5391 5392 proc = multiprocessing.Process( 5393 target=self._test_empty, 5394 args=(queue, child_can_start, parent_can_continue) 5395 ) 5396 proc.daemon = True 5397 proc.start() 5398 5399 self.assertTrue(queue.empty()) 5400 5401 child_can_start.set() 5402 parent_can_continue.wait() 5403 5404 self.assertFalse(queue.empty()) 5405 self.assertEqual(queue.get(), True) 5406 self.assertEqual(queue.get(), False) 5407 self.assertTrue(queue.empty()) 5408 5409 proc.join() 5410 5411 def test_close(self): 5412 queue = multiprocessing.SimpleQueue() 5413 queue.close() 5414 # closing a queue twice should not fail 5415 queue.close() 5416 5417 # Test specific to CPython since it tests private attributes 5418 @test.support.cpython_only 5419 def test_closed(self): 5420 queue = multiprocessing.SimpleQueue() 5421 queue.close() 5422 self.assertTrue(queue._reader.closed) 5423 self.assertTrue(queue._writer.closed) 5424 5425 5426class TestPoolNotLeakOnFailure(unittest.TestCase): 5427 5428 def test_release_unused_processes(self): 5429 # Issue #19675: During pool creation, if we can't create a process, 5430 # don't leak already created ones. 5431 will_fail_in = 3 5432 forked_processes = [] 5433 5434 class FailingForkProcess: 5435 def __init__(self, **kwargs): 5436 self.name = 'Fake Process' 5437 self.exitcode = None 5438 self.state = None 5439 forked_processes.append(self) 5440 5441 def start(self): 5442 nonlocal will_fail_in 5443 if will_fail_in <= 0: 5444 raise OSError("Manually induced OSError") 5445 will_fail_in -= 1 5446 self.state = 'started' 5447 5448 def terminate(self): 5449 self.state = 'stopping' 5450 5451 def join(self): 5452 if self.state == 'stopping': 5453 self.state = 'stopped' 5454 5455 def is_alive(self): 5456 return self.state == 'started' or self.state == 'stopping' 5457 5458 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 5459 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 5460 Process=FailingForkProcess)) 5461 p.close() 5462 p.join() 5463 self.assertFalse( 5464 any(process.is_alive() for process in forked_processes)) 5465 5466 5467@hashlib_helper.requires_hashdigest('md5') 5468class TestSyncManagerTypes(unittest.TestCase): 5469 """Test all the types which can be shared between a parent and a 5470 child process by using a manager which acts as an intermediary 5471 between them. 5472 5473 In the following unit-tests the base type is created in the parent 5474 process, the @classmethod represents the worker process and the 5475 shared object is readable and editable between the two. 5476 5477 # The child. 5478 @classmethod 5479 def _test_list(cls, obj): 5480 assert obj[0] == 5 5481 assert obj.append(6) 5482 5483 # The parent. 5484 def test_list(self): 5485 o = self.manager.list() 5486 o.append(5) 5487 self.run_worker(self._test_list, o) 5488 assert o[1] == 6 5489 """ 5490 manager_class = multiprocessing.managers.SyncManager 5491 5492 def setUp(self): 5493 self.manager = self.manager_class() 5494 self.manager.start() 5495 self.proc = None 5496 5497 def tearDown(self): 5498 if self.proc is not None and self.proc.is_alive(): 5499 self.proc.terminate() 5500 self.proc.join() 5501 self.manager.shutdown() 5502 self.manager = None 5503 self.proc = None 5504 5505 @classmethod 5506 def setUpClass(cls): 5507 support.reap_children() 5508 5509 tearDownClass = setUpClass 5510 5511 def wait_proc_exit(self): 5512 # Only the manager process should be returned by active_children() 5513 # but this can take a bit on slow machines, so wait a few seconds 5514 # if there are other children too (see #17395). 5515 join_process(self.proc) 5516 start_time = time.monotonic() 5517 t = 0.01 5518 while len(multiprocessing.active_children()) > 1: 5519 time.sleep(t) 5520 t *= 2 5521 dt = time.monotonic() - start_time 5522 if dt >= 5.0: 5523 test.support.environment_altered = True 5524 support.print_warning(f"multiprocessing.Manager still has " 5525 f"{multiprocessing.active_children()} " 5526 f"active children after {dt} seconds") 5527 break 5528 5529 def run_worker(self, worker, obj): 5530 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 5531 self.proc.daemon = True 5532 self.proc.start() 5533 self.wait_proc_exit() 5534 self.assertEqual(self.proc.exitcode, 0) 5535 5536 @classmethod 5537 def _test_event(cls, obj): 5538 assert obj.is_set() 5539 obj.wait() 5540 obj.clear() 5541 obj.wait(0.001) 5542 5543 def test_event(self): 5544 o = self.manager.Event() 5545 o.set() 5546 self.run_worker(self._test_event, o) 5547 assert not o.is_set() 5548 o.wait(0.001) 5549 5550 @classmethod 5551 def _test_lock(cls, obj): 5552 obj.acquire() 5553 5554 def test_lock(self, lname="Lock"): 5555 o = getattr(self.manager, lname)() 5556 self.run_worker(self._test_lock, o) 5557 o.release() 5558 self.assertRaises(RuntimeError, o.release) # already released 5559 5560 @classmethod 5561 def _test_rlock(cls, obj): 5562 obj.acquire() 5563 obj.release() 5564 5565 def test_rlock(self, lname="Lock"): 5566 o = getattr(self.manager, lname)() 5567 self.run_worker(self._test_rlock, o) 5568 5569 @classmethod 5570 def _test_semaphore(cls, obj): 5571 obj.acquire() 5572 5573 def test_semaphore(self, sname="Semaphore"): 5574 o = getattr(self.manager, sname)() 5575 self.run_worker(self._test_semaphore, o) 5576 o.release() 5577 5578 def test_bounded_semaphore(self): 5579 self.test_semaphore(sname="BoundedSemaphore") 5580 5581 @classmethod 5582 def _test_condition(cls, obj): 5583 obj.acquire() 5584 obj.release() 5585 5586 def test_condition(self): 5587 o = self.manager.Condition() 5588 self.run_worker(self._test_condition, o) 5589 5590 @classmethod 5591 def _test_barrier(cls, obj): 5592 assert obj.parties == 5 5593 obj.reset() 5594 5595 def test_barrier(self): 5596 o = self.manager.Barrier(5) 5597 self.run_worker(self._test_barrier, o) 5598 5599 @classmethod 5600 def _test_pool(cls, obj): 5601 # TODO: fix https://bugs.python.org/issue35919 5602 with obj: 5603 pass 5604 5605 def test_pool(self): 5606 o = self.manager.Pool(processes=4) 5607 self.run_worker(self._test_pool, o) 5608 5609 @classmethod 5610 def _test_queue(cls, obj): 5611 assert obj.qsize() == 2 5612 assert obj.full() 5613 assert not obj.empty() 5614 assert obj.get() == 5 5615 assert not obj.empty() 5616 assert obj.get() == 6 5617 assert obj.empty() 5618 5619 def test_queue(self, qname="Queue"): 5620 o = getattr(self.manager, qname)(2) 5621 o.put(5) 5622 o.put(6) 5623 self.run_worker(self._test_queue, o) 5624 assert o.empty() 5625 assert not o.full() 5626 5627 def test_joinable_queue(self): 5628 self.test_queue("JoinableQueue") 5629 5630 @classmethod 5631 def _test_list(cls, obj): 5632 assert obj[0] == 5 5633 assert obj.count(5) == 1 5634 assert obj.index(5) == 0 5635 obj.sort() 5636 obj.reverse() 5637 for x in obj: 5638 pass 5639 assert len(obj) == 1 5640 assert obj.pop(0) == 5 5641 5642 def test_list(self): 5643 o = self.manager.list() 5644 o.append(5) 5645 self.run_worker(self._test_list, o) 5646 assert not o 5647 self.assertEqual(len(o), 0) 5648 5649 @classmethod 5650 def _test_dict(cls, obj): 5651 assert len(obj) == 1 5652 assert obj['foo'] == 5 5653 assert obj.get('foo') == 5 5654 assert list(obj.items()) == [('foo', 5)] 5655 assert list(obj.keys()) == ['foo'] 5656 assert list(obj.values()) == [5] 5657 assert obj.copy() == {'foo': 5} 5658 assert obj.popitem() == ('foo', 5) 5659 5660 def test_dict(self): 5661 o = self.manager.dict() 5662 o['foo'] = 5 5663 self.run_worker(self._test_dict, o) 5664 assert not o 5665 self.assertEqual(len(o), 0) 5666 5667 @classmethod 5668 def _test_value(cls, obj): 5669 assert obj.value == 1 5670 assert obj.get() == 1 5671 obj.set(2) 5672 5673 def test_value(self): 5674 o = self.manager.Value('i', 1) 5675 self.run_worker(self._test_value, o) 5676 self.assertEqual(o.value, 2) 5677 self.assertEqual(o.get(), 2) 5678 5679 @classmethod 5680 def _test_array(cls, obj): 5681 assert obj[0] == 0 5682 assert obj[1] == 1 5683 assert len(obj) == 2 5684 assert list(obj) == [0, 1] 5685 5686 def test_array(self): 5687 o = self.manager.Array('i', [0, 1]) 5688 self.run_worker(self._test_array, o) 5689 5690 @classmethod 5691 def _test_namespace(cls, obj): 5692 assert obj.x == 0 5693 assert obj.y == 1 5694 5695 def test_namespace(self): 5696 o = self.manager.Namespace() 5697 o.x = 0 5698 o.y = 1 5699 self.run_worker(self._test_namespace, o) 5700 5701 5702class MiscTestCase(unittest.TestCase): 5703 def test__all__(self): 5704 # Just make sure names in not_exported are excluded 5705 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 5706 not_exported=['SUBDEBUG', 'SUBWARNING']) 5707 5708 5709# 5710# Mixins 5711# 5712 5713class BaseMixin(object): 5714 @classmethod 5715 def setUpClass(cls): 5716 cls.dangling = (multiprocessing.process._dangling.copy(), 5717 threading._dangling.copy()) 5718 5719 @classmethod 5720 def tearDownClass(cls): 5721 # bpo-26762: Some multiprocessing objects like Pool create reference 5722 # cycles. Trigger a garbage collection to break these cycles. 5723 test.support.gc_collect() 5724 5725 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 5726 if processes: 5727 test.support.environment_altered = True 5728 support.print_warning(f'Dangling processes: {processes}') 5729 processes = None 5730 5731 threads = set(threading._dangling) - set(cls.dangling[1]) 5732 if threads: 5733 test.support.environment_altered = True 5734 support.print_warning(f'Dangling threads: {threads}') 5735 threads = None 5736 5737 5738class ProcessesMixin(BaseMixin): 5739 TYPE = 'processes' 5740 Process = multiprocessing.Process 5741 connection = multiprocessing.connection 5742 current_process = staticmethod(multiprocessing.current_process) 5743 parent_process = staticmethod(multiprocessing.parent_process) 5744 active_children = staticmethod(multiprocessing.active_children) 5745 Pool = staticmethod(multiprocessing.Pool) 5746 Pipe = staticmethod(multiprocessing.Pipe) 5747 Queue = staticmethod(multiprocessing.Queue) 5748 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 5749 Lock = staticmethod(multiprocessing.Lock) 5750 RLock = staticmethod(multiprocessing.RLock) 5751 Semaphore = staticmethod(multiprocessing.Semaphore) 5752 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 5753 Condition = staticmethod(multiprocessing.Condition) 5754 Event = staticmethod(multiprocessing.Event) 5755 Barrier = staticmethod(multiprocessing.Barrier) 5756 Value = staticmethod(multiprocessing.Value) 5757 Array = staticmethod(multiprocessing.Array) 5758 RawValue = staticmethod(multiprocessing.RawValue) 5759 RawArray = staticmethod(multiprocessing.RawArray) 5760 5761 5762class ManagerMixin(BaseMixin): 5763 TYPE = 'manager' 5764 Process = multiprocessing.Process 5765 Queue = property(operator.attrgetter('manager.Queue')) 5766 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 5767 Lock = property(operator.attrgetter('manager.Lock')) 5768 RLock = property(operator.attrgetter('manager.RLock')) 5769 Semaphore = property(operator.attrgetter('manager.Semaphore')) 5770 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 5771 Condition = property(operator.attrgetter('manager.Condition')) 5772 Event = property(operator.attrgetter('manager.Event')) 5773 Barrier = property(operator.attrgetter('manager.Barrier')) 5774 Value = property(operator.attrgetter('manager.Value')) 5775 Array = property(operator.attrgetter('manager.Array')) 5776 list = property(operator.attrgetter('manager.list')) 5777 dict = property(operator.attrgetter('manager.dict')) 5778 Namespace = property(operator.attrgetter('manager.Namespace')) 5779 5780 @classmethod 5781 def Pool(cls, *args, **kwds): 5782 return cls.manager.Pool(*args, **kwds) 5783 5784 @classmethod 5785 def setUpClass(cls): 5786 super().setUpClass() 5787 cls.manager = multiprocessing.Manager() 5788 5789 @classmethod 5790 def tearDownClass(cls): 5791 # only the manager process should be returned by active_children() 5792 # but this can take a bit on slow machines, so wait a few seconds 5793 # if there are other children too (see #17395) 5794 start_time = time.monotonic() 5795 t = 0.01 5796 while len(multiprocessing.active_children()) > 1: 5797 time.sleep(t) 5798 t *= 2 5799 dt = time.monotonic() - start_time 5800 if dt >= 5.0: 5801 test.support.environment_altered = True 5802 support.print_warning(f"multiprocessing.Manager still has " 5803 f"{multiprocessing.active_children()} " 5804 f"active children after {dt} seconds") 5805 break 5806 5807 gc.collect() # do garbage collection 5808 if cls.manager._number_of_objects() != 0: 5809 # This is not really an error since some tests do not 5810 # ensure that all processes which hold a reference to a 5811 # managed object have been joined. 5812 test.support.environment_altered = True 5813 support.print_warning('Shared objects which still exist ' 5814 'at manager shutdown:') 5815 support.print_warning(cls.manager._debug_info()) 5816 cls.manager.shutdown() 5817 cls.manager.join() 5818 cls.manager = None 5819 5820 super().tearDownClass() 5821 5822 5823class ThreadsMixin(BaseMixin): 5824 TYPE = 'threads' 5825 Process = multiprocessing.dummy.Process 5826 connection = multiprocessing.dummy.connection 5827 current_process = staticmethod(multiprocessing.dummy.current_process) 5828 active_children = staticmethod(multiprocessing.dummy.active_children) 5829 Pool = staticmethod(multiprocessing.dummy.Pool) 5830 Pipe = staticmethod(multiprocessing.dummy.Pipe) 5831 Queue = staticmethod(multiprocessing.dummy.Queue) 5832 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 5833 Lock = staticmethod(multiprocessing.dummy.Lock) 5834 RLock = staticmethod(multiprocessing.dummy.RLock) 5835 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 5836 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 5837 Condition = staticmethod(multiprocessing.dummy.Condition) 5838 Event = staticmethod(multiprocessing.dummy.Event) 5839 Barrier = staticmethod(multiprocessing.dummy.Barrier) 5840 Value = staticmethod(multiprocessing.dummy.Value) 5841 Array = staticmethod(multiprocessing.dummy.Array) 5842 5843# 5844# Functions used to create test cases from the base ones in this module 5845# 5846 5847def install_tests_in_module_dict(remote_globs, start_method): 5848 __module__ = remote_globs['__name__'] 5849 local_globs = globals() 5850 ALL_TYPES = {'processes', 'threads', 'manager'} 5851 5852 for name, base in local_globs.items(): 5853 if not isinstance(base, type): 5854 continue 5855 if issubclass(base, BaseTestCase): 5856 if base is BaseTestCase: 5857 continue 5858 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 5859 for type_ in base.ALLOWED_TYPES: 5860 newname = 'With' + type_.capitalize() + name[1:] 5861 Mixin = local_globs[type_.capitalize() + 'Mixin'] 5862 class Temp(base, Mixin, unittest.TestCase): 5863 pass 5864 if type_ == 'manager': 5865 Temp = hashlib_helper.requires_hashdigest('md5')(Temp) 5866 Temp.__name__ = Temp.__qualname__ = newname 5867 Temp.__module__ = __module__ 5868 remote_globs[newname] = Temp 5869 elif issubclass(base, unittest.TestCase): 5870 class Temp(base, object): 5871 pass 5872 Temp.__name__ = Temp.__qualname__ = name 5873 Temp.__module__ = __module__ 5874 remote_globs[name] = Temp 5875 5876 dangling = [None, None] 5877 old_start_method = [None] 5878 5879 def setUpModule(): 5880 multiprocessing.set_forkserver_preload(PRELOAD) 5881 multiprocessing.process._cleanup() 5882 dangling[0] = multiprocessing.process._dangling.copy() 5883 dangling[1] = threading._dangling.copy() 5884 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 5885 try: 5886 multiprocessing.set_start_method(start_method, force=True) 5887 except ValueError: 5888 raise unittest.SkipTest(start_method + 5889 ' start method not supported') 5890 5891 if sys.platform.startswith("linux"): 5892 try: 5893 lock = multiprocessing.RLock() 5894 except OSError: 5895 raise unittest.SkipTest("OSError raises on RLock creation, " 5896 "see issue 3111!") 5897 check_enough_semaphores() 5898 util.get_temp_dir() # creates temp directory 5899 multiprocessing.get_logger().setLevel(LOG_LEVEL) 5900 5901 def tearDownModule(): 5902 need_sleep = False 5903 5904 # bpo-26762: Some multiprocessing objects like Pool create reference 5905 # cycles. Trigger a garbage collection to break these cycles. 5906 test.support.gc_collect() 5907 5908 multiprocessing.set_start_method(old_start_method[0], force=True) 5909 # pause a bit so we don't get warning about dangling threads/processes 5910 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 5911 if processes: 5912 need_sleep = True 5913 test.support.environment_altered = True 5914 support.print_warning(f'Dangling processes: {processes}') 5915 processes = None 5916 5917 threads = set(threading._dangling) - set(dangling[1]) 5918 if threads: 5919 need_sleep = True 5920 test.support.environment_altered = True 5921 support.print_warning(f'Dangling threads: {threads}') 5922 threads = None 5923 5924 # Sleep 500 ms to give time to child processes to complete. 5925 if need_sleep: 5926 time.sleep(0.5) 5927 5928 multiprocessing.util._cleanup_tests() 5929 5930 remote_globs['setUpModule'] = setUpModule 5931 remote_globs['tearDownModule'] = tearDownModule 5932