1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import time 9import io 10import itertools 11import sys 12import os 13import gc 14import errno 15import signal 16import array 17import socket 18import random 19import logging 20import subprocess 21import struct 22import operator 23import pickle 24import weakref 25import warnings 26import test.support 27import test.support.script_helper 28from test import support 29from test.support import hashlib_helper 30from test.support import import_helper 31from test.support import os_helper 32from test.support import socket_helper 33from test.support import threading_helper 34from test.support import warnings_helper 35 36 37# Skip tests if _multiprocessing wasn't built. 38_multiprocessing = import_helper.import_module('_multiprocessing') 39# Skip tests if sem_open implementation is broken. 40support.skip_if_broken_multiprocessing_synchronize() 41import threading 42 43import multiprocessing.connection 44import multiprocessing.dummy 45import multiprocessing.heap 46import multiprocessing.managers 47import multiprocessing.pool 48import multiprocessing.queues 49 50from multiprocessing import util 51 52try: 53 from multiprocessing import reduction 54 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 55except ImportError: 56 HAS_REDUCTION = False 57 58try: 59 from multiprocessing.sharedctypes import Value, copy 60 HAS_SHAREDCTYPES = True 61except ImportError: 62 HAS_SHAREDCTYPES = False 63 64try: 65 from multiprocessing import shared_memory 66 HAS_SHMEM = True 67except ImportError: 68 HAS_SHMEM = False 69 70try: 71 import msvcrt 72except ImportError: 73 msvcrt = None 74 75 76def latin(s): 77 return s.encode('latin') 78 79 80def close_queue(queue): 81 if isinstance(queue, multiprocessing.queues.Queue): 82 queue.close() 83 queue.join_thread() 84 85 86def join_process(process): 87 # Since multiprocessing.Process has the same API than threading.Thread 88 # (join() and is_alive(), the support function can be reused 89 threading_helper.join_thread(process) 90 91 92if os.name == "posix": 93 from multiprocessing import resource_tracker 94 95 def _resource_unlink(name, rtype): 96 resource_tracker._CLEANUP_FUNCS[rtype](name) 97 98 99# 100# Constants 101# 102 103LOG_LEVEL = util.SUBWARNING 104#LOG_LEVEL = logging.DEBUG 105 106DELTA = 0.1 107CHECK_TIMINGS = False # making true makes tests take a lot longer 108 # and can sometimes cause some non-serious 109 # failures because some calls block a bit 110 # longer than expected 111if CHECK_TIMINGS: 112 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 113else: 114 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 115 116HAVE_GETVALUE = not getattr(_multiprocessing, 117 'HAVE_BROKEN_SEM_GETVALUE', False) 118 119WIN32 = (sys.platform == "win32") 120 121from multiprocessing.connection import wait 122 123def wait_for_handle(handle, timeout): 124 if timeout is not None and timeout < 0.0: 125 timeout = None 126 return wait([handle], timeout) 127 128try: 129 MAXFD = os.sysconf("SC_OPEN_MAX") 130except: 131 MAXFD = 256 132 133# To speed up tests when using the forkserver, we can preload these: 134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 135 136# 137# Some tests require ctypes 138# 139 140try: 141 from ctypes import Structure, c_int, c_double, c_longlong 142except ImportError: 143 Structure = object 144 c_int = c_double = c_longlong = None 145 146 147def check_enough_semaphores(): 148 """Check that the system supports enough semaphores to run the test.""" 149 # minimum number of semaphores available according to POSIX 150 nsems_min = 256 151 try: 152 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 153 except (AttributeError, ValueError): 154 # sysconf not available or setting not available 155 return 156 if nsems == -1 or nsems >= nsems_min: 157 return 158 raise unittest.SkipTest("The OS doesn't support enough semaphores " 159 "to run the test (required: %d)." % nsems_min) 160 161 162# 163# Creates a wrapper for a function which records the time it takes to finish 164# 165 166class TimingWrapper(object): 167 168 def __init__(self, func): 169 self.func = func 170 self.elapsed = None 171 172 def __call__(self, *args, **kwds): 173 t = time.monotonic() 174 try: 175 return self.func(*args, **kwds) 176 finally: 177 self.elapsed = time.monotonic() - t 178 179# 180# Base class for test cases 181# 182 183class BaseTestCase(object): 184 185 ALLOWED_TYPES = ('processes', 'manager', 'threads') 186 187 def assertTimingAlmostEqual(self, a, b): 188 if CHECK_TIMINGS: 189 self.assertAlmostEqual(a, b, 1) 190 191 def assertReturnsIfImplemented(self, value, func, *args): 192 try: 193 res = func(*args) 194 except NotImplementedError: 195 pass 196 else: 197 return self.assertEqual(value, res) 198 199 # For the sanity of Windows users, rather than crashing or freezing in 200 # multiple ways. 201 def __reduce__(self, *args): 202 raise NotImplementedError("shouldn't try to pickle a test case") 203 204 __reduce_ex__ = __reduce__ 205 206# 207# Return the value of a semaphore 208# 209 210def get_value(self): 211 try: 212 return self.get_value() 213 except AttributeError: 214 try: 215 return self._Semaphore__value 216 except AttributeError: 217 try: 218 return self._value 219 except AttributeError: 220 raise NotImplementedError 221 222# 223# Testcases 224# 225 226class DummyCallable: 227 def __call__(self, q, c): 228 assert isinstance(c, DummyCallable) 229 q.put(5) 230 231 232class _TestProcess(BaseTestCase): 233 234 ALLOWED_TYPES = ('processes', 'threads') 235 236 def test_current(self): 237 if self.TYPE == 'threads': 238 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 239 240 current = self.current_process() 241 authkey = current.authkey 242 243 self.assertTrue(current.is_alive()) 244 self.assertTrue(not current.daemon) 245 self.assertIsInstance(authkey, bytes) 246 self.assertTrue(len(authkey) > 0) 247 self.assertEqual(current.ident, os.getpid()) 248 self.assertEqual(current.exitcode, None) 249 250 def test_daemon_argument(self): 251 if self.TYPE == "threads": 252 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 253 254 # By default uses the current process's daemon flag. 255 proc0 = self.Process(target=self._test) 256 self.assertEqual(proc0.daemon, self.current_process().daemon) 257 proc1 = self.Process(target=self._test, daemon=True) 258 self.assertTrue(proc1.daemon) 259 proc2 = self.Process(target=self._test, daemon=False) 260 self.assertFalse(proc2.daemon) 261 262 @classmethod 263 def _test(cls, q, *args, **kwds): 264 current = cls.current_process() 265 q.put(args) 266 q.put(kwds) 267 q.put(current.name) 268 if cls.TYPE != 'threads': 269 q.put(bytes(current.authkey)) 270 q.put(current.pid) 271 272 def test_parent_process_attributes(self): 273 if self.TYPE == "threads": 274 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 275 276 self.assertIsNone(self.parent_process()) 277 278 rconn, wconn = self.Pipe(duplex=False) 279 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 280 p.start() 281 p.join() 282 parent_pid, parent_name = rconn.recv() 283 self.assertEqual(parent_pid, self.current_process().pid) 284 self.assertEqual(parent_pid, os.getpid()) 285 self.assertEqual(parent_name, self.current_process().name) 286 287 @classmethod 288 def _test_send_parent_process(cls, wconn): 289 from multiprocessing.process import parent_process 290 wconn.send([parent_process().pid, parent_process().name]) 291 292 def test_parent_process(self): 293 if self.TYPE == "threads": 294 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 295 296 # Launch a child process. Make it launch a grandchild process. Kill the 297 # child process and make sure that the grandchild notices the death of 298 # its parent (a.k.a the child process). 299 rconn, wconn = self.Pipe(duplex=False) 300 p = self.Process( 301 target=self._test_create_grandchild_process, args=(wconn, )) 302 p.start() 303 304 if not rconn.poll(timeout=support.LONG_TIMEOUT): 305 raise AssertionError("Could not communicate with child process") 306 parent_process_status = rconn.recv() 307 self.assertEqual(parent_process_status, "alive") 308 309 p.terminate() 310 p.join() 311 312 if not rconn.poll(timeout=support.LONG_TIMEOUT): 313 raise AssertionError("Could not communicate with child process") 314 parent_process_status = rconn.recv() 315 self.assertEqual(parent_process_status, "not alive") 316 317 @classmethod 318 def _test_create_grandchild_process(cls, wconn): 319 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 320 p.start() 321 time.sleep(300) 322 323 @classmethod 324 def _test_report_parent_status(cls, wconn): 325 from multiprocessing.process import parent_process 326 wconn.send("alive" if parent_process().is_alive() else "not alive") 327 parent_process().join(timeout=support.SHORT_TIMEOUT) 328 wconn.send("alive" if parent_process().is_alive() else "not alive") 329 330 def test_process(self): 331 q = self.Queue(1) 332 e = self.Event() 333 args = (q, 1, 2) 334 kwargs = {'hello':23, 'bye':2.54} 335 name = 'SomeProcess' 336 p = self.Process( 337 target=self._test, args=args, kwargs=kwargs, name=name 338 ) 339 p.daemon = True 340 current = self.current_process() 341 342 if self.TYPE != 'threads': 343 self.assertEqual(p.authkey, current.authkey) 344 self.assertEqual(p.is_alive(), False) 345 self.assertEqual(p.daemon, True) 346 self.assertNotIn(p, self.active_children()) 347 self.assertTrue(type(self.active_children()) is list) 348 self.assertEqual(p.exitcode, None) 349 350 p.start() 351 352 self.assertEqual(p.exitcode, None) 353 self.assertEqual(p.is_alive(), True) 354 self.assertIn(p, self.active_children()) 355 356 self.assertEqual(q.get(), args[1:]) 357 self.assertEqual(q.get(), kwargs) 358 self.assertEqual(q.get(), p.name) 359 if self.TYPE != 'threads': 360 self.assertEqual(q.get(), current.authkey) 361 self.assertEqual(q.get(), p.pid) 362 363 p.join() 364 365 self.assertEqual(p.exitcode, 0) 366 self.assertEqual(p.is_alive(), False) 367 self.assertNotIn(p, self.active_children()) 368 close_queue(q) 369 370 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 371 def test_process_mainthread_native_id(self): 372 if self.TYPE == 'threads': 373 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 374 375 current_mainthread_native_id = threading.main_thread().native_id 376 377 q = self.Queue(1) 378 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 379 p.start() 380 381 child_mainthread_native_id = q.get() 382 p.join() 383 close_queue(q) 384 385 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 386 387 @classmethod 388 def _test_process_mainthread_native_id(cls, q): 389 mainthread_native_id = threading.main_thread().native_id 390 q.put(mainthread_native_id) 391 392 @classmethod 393 def _sleep_some(cls): 394 time.sleep(100) 395 396 @classmethod 397 def _test_sleep(cls, delay): 398 time.sleep(delay) 399 400 def _kill_process(self, meth): 401 if self.TYPE == 'threads': 402 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 403 404 p = self.Process(target=self._sleep_some) 405 p.daemon = True 406 p.start() 407 408 self.assertEqual(p.is_alive(), True) 409 self.assertIn(p, self.active_children()) 410 self.assertEqual(p.exitcode, None) 411 412 join = TimingWrapper(p.join) 413 414 self.assertEqual(join(0), None) 415 self.assertTimingAlmostEqual(join.elapsed, 0.0) 416 self.assertEqual(p.is_alive(), True) 417 418 self.assertEqual(join(-1), None) 419 self.assertTimingAlmostEqual(join.elapsed, 0.0) 420 self.assertEqual(p.is_alive(), True) 421 422 # XXX maybe terminating too soon causes the problems on Gentoo... 423 time.sleep(1) 424 425 meth(p) 426 427 if hasattr(signal, 'alarm'): 428 # On the Gentoo buildbot waitpid() often seems to block forever. 429 # We use alarm() to interrupt it if it blocks for too long. 430 def handler(*args): 431 raise RuntimeError('join took too long: %s' % p) 432 old_handler = signal.signal(signal.SIGALRM, handler) 433 try: 434 signal.alarm(10) 435 self.assertEqual(join(), None) 436 finally: 437 signal.alarm(0) 438 signal.signal(signal.SIGALRM, old_handler) 439 else: 440 self.assertEqual(join(), None) 441 442 self.assertTimingAlmostEqual(join.elapsed, 0.0) 443 444 self.assertEqual(p.is_alive(), False) 445 self.assertNotIn(p, self.active_children()) 446 447 p.join() 448 449 return p.exitcode 450 451 def test_terminate(self): 452 exitcode = self._kill_process(multiprocessing.Process.terminate) 453 if os.name != 'nt': 454 self.assertEqual(exitcode, -signal.SIGTERM) 455 456 def test_kill(self): 457 exitcode = self._kill_process(multiprocessing.Process.kill) 458 if os.name != 'nt': 459 self.assertEqual(exitcode, -signal.SIGKILL) 460 461 def test_cpu_count(self): 462 try: 463 cpus = multiprocessing.cpu_count() 464 except NotImplementedError: 465 cpus = 1 466 self.assertTrue(type(cpus) is int) 467 self.assertTrue(cpus >= 1) 468 469 def test_active_children(self): 470 self.assertEqual(type(self.active_children()), list) 471 472 p = self.Process(target=time.sleep, args=(DELTA,)) 473 self.assertNotIn(p, self.active_children()) 474 475 p.daemon = True 476 p.start() 477 self.assertIn(p, self.active_children()) 478 479 p.join() 480 self.assertNotIn(p, self.active_children()) 481 482 @classmethod 483 def _test_recursion(cls, wconn, id): 484 wconn.send(id) 485 if len(id) < 2: 486 for i in range(2): 487 p = cls.Process( 488 target=cls._test_recursion, args=(wconn, id+[i]) 489 ) 490 p.start() 491 p.join() 492 493 def test_recursion(self): 494 rconn, wconn = self.Pipe(duplex=False) 495 self._test_recursion(wconn, []) 496 497 time.sleep(DELTA) 498 result = [] 499 while rconn.poll(): 500 result.append(rconn.recv()) 501 502 expected = [ 503 [], 504 [0], 505 [0, 0], 506 [0, 1], 507 [1], 508 [1, 0], 509 [1, 1] 510 ] 511 self.assertEqual(result, expected) 512 513 @classmethod 514 def _test_sentinel(cls, event): 515 event.wait(10.0) 516 517 def test_sentinel(self): 518 if self.TYPE == "threads": 519 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 520 event = self.Event() 521 p = self.Process(target=self._test_sentinel, args=(event,)) 522 with self.assertRaises(ValueError): 523 p.sentinel 524 p.start() 525 self.addCleanup(p.join) 526 sentinel = p.sentinel 527 self.assertIsInstance(sentinel, int) 528 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 529 event.set() 530 p.join() 531 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 532 533 @classmethod 534 def _test_close(cls, rc=0, q=None): 535 if q is not None: 536 q.get() 537 sys.exit(rc) 538 539 def test_close(self): 540 if self.TYPE == "threads": 541 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 542 q = self.Queue() 543 p = self.Process(target=self._test_close, kwargs={'q': q}) 544 p.daemon = True 545 p.start() 546 self.assertEqual(p.is_alive(), True) 547 # Child is still alive, cannot close 548 with self.assertRaises(ValueError): 549 p.close() 550 551 q.put(None) 552 p.join() 553 self.assertEqual(p.is_alive(), False) 554 self.assertEqual(p.exitcode, 0) 555 p.close() 556 with self.assertRaises(ValueError): 557 p.is_alive() 558 with self.assertRaises(ValueError): 559 p.join() 560 with self.assertRaises(ValueError): 561 p.terminate() 562 p.close() 563 564 wr = weakref.ref(p) 565 del p 566 gc.collect() 567 self.assertIs(wr(), None) 568 569 close_queue(q) 570 571 def test_many_processes(self): 572 if self.TYPE == 'threads': 573 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 574 575 sm = multiprocessing.get_start_method() 576 N = 5 if sm == 'spawn' else 100 577 578 # Try to overwhelm the forkserver loop with events 579 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 580 for i in range(N)] 581 for p in procs: 582 p.start() 583 for p in procs: 584 join_process(p) 585 for p in procs: 586 self.assertEqual(p.exitcode, 0) 587 588 procs = [self.Process(target=self._sleep_some) 589 for i in range(N)] 590 for p in procs: 591 p.start() 592 time.sleep(0.001) # let the children start... 593 for p in procs: 594 p.terminate() 595 for p in procs: 596 join_process(p) 597 if os.name != 'nt': 598 exitcodes = [-signal.SIGTERM] 599 if sys.platform == 'darwin': 600 # bpo-31510: On macOS, killing a freshly started process with 601 # SIGTERM sometimes kills the process with SIGKILL. 602 exitcodes.append(-signal.SIGKILL) 603 for p in procs: 604 self.assertIn(p.exitcode, exitcodes) 605 606 def test_lose_target_ref(self): 607 c = DummyCallable() 608 wr = weakref.ref(c) 609 q = self.Queue() 610 p = self.Process(target=c, args=(q, c)) 611 del c 612 p.start() 613 p.join() 614 gc.collect() # For PyPy or other GCs. 615 self.assertIs(wr(), None) 616 self.assertEqual(q.get(), 5) 617 close_queue(q) 618 619 @classmethod 620 def _test_child_fd_inflation(self, evt, q): 621 q.put(os_helper.fd_count()) 622 evt.wait() 623 624 def test_child_fd_inflation(self): 625 # Number of fds in child processes should not grow with the 626 # number of running children. 627 if self.TYPE == 'threads': 628 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 629 630 sm = multiprocessing.get_start_method() 631 if sm == 'fork': 632 # The fork method by design inherits all fds from the parent, 633 # trying to go against it is a lost battle 634 self.skipTest('test not appropriate for {}'.format(sm)) 635 636 N = 5 637 evt = self.Event() 638 q = self.Queue() 639 640 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 641 for i in range(N)] 642 for p in procs: 643 p.start() 644 645 try: 646 fd_counts = [q.get() for i in range(N)] 647 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 648 649 finally: 650 evt.set() 651 for p in procs: 652 p.join() 653 close_queue(q) 654 655 @classmethod 656 def _test_wait_for_threads(self, evt): 657 def func1(): 658 time.sleep(0.5) 659 evt.set() 660 661 def func2(): 662 time.sleep(20) 663 evt.clear() 664 665 threading.Thread(target=func1).start() 666 threading.Thread(target=func2, daemon=True).start() 667 668 def test_wait_for_threads(self): 669 # A child process should wait for non-daemonic threads to end 670 # before exiting 671 if self.TYPE == 'threads': 672 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 673 674 evt = self.Event() 675 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 676 proc.start() 677 proc.join() 678 self.assertTrue(evt.is_set()) 679 680 @classmethod 681 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 682 for stream_name, action in break_std_streams.items(): 683 if action == 'close': 684 stream = io.StringIO() 685 stream.close() 686 else: 687 assert action == 'remove' 688 stream = None 689 setattr(sys, stream_name, None) 690 evt.set() 691 692 def test_error_on_stdio_flush_1(self): 693 # Check that Process works with broken standard streams 694 streams = [io.StringIO(), None] 695 streams[0].close() 696 for stream_name in ('stdout', 'stderr'): 697 for stream in streams: 698 old_stream = getattr(sys, stream_name) 699 setattr(sys, stream_name, stream) 700 try: 701 evt = self.Event() 702 proc = self.Process(target=self._test_error_on_stdio_flush, 703 args=(evt,)) 704 proc.start() 705 proc.join() 706 self.assertTrue(evt.is_set()) 707 self.assertEqual(proc.exitcode, 0) 708 finally: 709 setattr(sys, stream_name, old_stream) 710 711 def test_error_on_stdio_flush_2(self): 712 # Same as test_error_on_stdio_flush_1(), but standard streams are 713 # broken by the child process 714 for stream_name in ('stdout', 'stderr'): 715 for action in ('close', 'remove'): 716 old_stream = getattr(sys, stream_name) 717 try: 718 evt = self.Event() 719 proc = self.Process(target=self._test_error_on_stdio_flush, 720 args=(evt, {stream_name: action})) 721 proc.start() 722 proc.join() 723 self.assertTrue(evt.is_set()) 724 self.assertEqual(proc.exitcode, 0) 725 finally: 726 setattr(sys, stream_name, old_stream) 727 728 @classmethod 729 def _sleep_and_set_event(self, evt, delay=0.0): 730 time.sleep(delay) 731 evt.set() 732 733 def check_forkserver_death(self, signum): 734 # bpo-31308: if the forkserver process has died, we should still 735 # be able to create and run new Process instances (the forkserver 736 # is implicitly restarted). 737 if self.TYPE == 'threads': 738 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 739 sm = multiprocessing.get_start_method() 740 if sm != 'forkserver': 741 # The fork method by design inherits all fds from the parent, 742 # trying to go against it is a lost battle 743 self.skipTest('test not appropriate for {}'.format(sm)) 744 745 from multiprocessing.forkserver import _forkserver 746 _forkserver.ensure_running() 747 748 # First process sleeps 500 ms 749 delay = 0.5 750 751 evt = self.Event() 752 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 753 proc.start() 754 755 pid = _forkserver._forkserver_pid 756 os.kill(pid, signum) 757 # give time to the fork server to die and time to proc to complete 758 time.sleep(delay * 2.0) 759 760 evt2 = self.Event() 761 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 762 proc2.start() 763 proc2.join() 764 self.assertTrue(evt2.is_set()) 765 self.assertEqual(proc2.exitcode, 0) 766 767 proc.join() 768 self.assertTrue(evt.is_set()) 769 self.assertIn(proc.exitcode, (0, 255)) 770 771 def test_forkserver_sigint(self): 772 # Catchable signal 773 self.check_forkserver_death(signal.SIGINT) 774 775 def test_forkserver_sigkill(self): 776 # Uncatchable signal 777 if os.name != 'nt': 778 self.check_forkserver_death(signal.SIGKILL) 779 780 781# 782# 783# 784 785class _UpperCaser(multiprocessing.Process): 786 787 def __init__(self): 788 multiprocessing.Process.__init__(self) 789 self.child_conn, self.parent_conn = multiprocessing.Pipe() 790 791 def run(self): 792 self.parent_conn.close() 793 for s in iter(self.child_conn.recv, None): 794 self.child_conn.send(s.upper()) 795 self.child_conn.close() 796 797 def submit(self, s): 798 assert type(s) is str 799 self.parent_conn.send(s) 800 return self.parent_conn.recv() 801 802 def stop(self): 803 self.parent_conn.send(None) 804 self.parent_conn.close() 805 self.child_conn.close() 806 807class _TestSubclassingProcess(BaseTestCase): 808 809 ALLOWED_TYPES = ('processes',) 810 811 def test_subclassing(self): 812 uppercaser = _UpperCaser() 813 uppercaser.daemon = True 814 uppercaser.start() 815 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 816 self.assertEqual(uppercaser.submit('world'), 'WORLD') 817 uppercaser.stop() 818 uppercaser.join() 819 820 def test_stderr_flush(self): 821 # sys.stderr is flushed at process shutdown (issue #13812) 822 if self.TYPE == "threads": 823 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 824 825 testfn = os_helper.TESTFN 826 self.addCleanup(os_helper.unlink, testfn) 827 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 828 proc.start() 829 proc.join() 830 with open(testfn, encoding="utf-8") as f: 831 err = f.read() 832 # The whole traceback was printed 833 self.assertIn("ZeroDivisionError", err) 834 self.assertIn("test_multiprocessing.py", err) 835 self.assertIn("1/0 # MARKER", err) 836 837 @classmethod 838 def _test_stderr_flush(cls, testfn): 839 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 840 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 841 1/0 # MARKER 842 843 844 @classmethod 845 def _test_sys_exit(cls, reason, testfn): 846 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 847 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 848 sys.exit(reason) 849 850 def test_sys_exit(self): 851 # See Issue 13854 852 if self.TYPE == 'threads': 853 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 854 855 testfn = os_helper.TESTFN 856 self.addCleanup(os_helper.unlink, testfn) 857 858 for reason in ( 859 [1, 2, 3], 860 'ignore this', 861 ): 862 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 863 p.daemon = True 864 p.start() 865 join_process(p) 866 self.assertEqual(p.exitcode, 1) 867 868 with open(testfn, encoding="utf-8") as f: 869 content = f.read() 870 self.assertEqual(content.rstrip(), str(reason)) 871 872 os.unlink(testfn) 873 874 cases = [ 875 ((True,), 1), 876 ((False,), 0), 877 ((8,), 8), 878 ((None,), 0), 879 ((), 0), 880 ] 881 882 for args, expected in cases: 883 with self.subTest(args=args): 884 p = self.Process(target=sys.exit, args=args) 885 p.daemon = True 886 p.start() 887 join_process(p) 888 self.assertEqual(p.exitcode, expected) 889 890# 891# 892# 893 894def queue_empty(q): 895 if hasattr(q, 'empty'): 896 return q.empty() 897 else: 898 return q.qsize() == 0 899 900def queue_full(q, maxsize): 901 if hasattr(q, 'full'): 902 return q.full() 903 else: 904 return q.qsize() == maxsize 905 906 907class _TestQueue(BaseTestCase): 908 909 910 @classmethod 911 def _test_put(cls, queue, child_can_start, parent_can_continue): 912 child_can_start.wait() 913 for i in range(6): 914 queue.get() 915 parent_can_continue.set() 916 917 def test_put(self): 918 MAXSIZE = 6 919 queue = self.Queue(maxsize=MAXSIZE) 920 child_can_start = self.Event() 921 parent_can_continue = self.Event() 922 923 proc = self.Process( 924 target=self._test_put, 925 args=(queue, child_can_start, parent_can_continue) 926 ) 927 proc.daemon = True 928 proc.start() 929 930 self.assertEqual(queue_empty(queue), True) 931 self.assertEqual(queue_full(queue, MAXSIZE), False) 932 933 queue.put(1) 934 queue.put(2, True) 935 queue.put(3, True, None) 936 queue.put(4, False) 937 queue.put(5, False, None) 938 queue.put_nowait(6) 939 940 # the values may be in buffer but not yet in pipe so sleep a bit 941 time.sleep(DELTA) 942 943 self.assertEqual(queue_empty(queue), False) 944 self.assertEqual(queue_full(queue, MAXSIZE), True) 945 946 put = TimingWrapper(queue.put) 947 put_nowait = TimingWrapper(queue.put_nowait) 948 949 self.assertRaises(pyqueue.Full, put, 7, False) 950 self.assertTimingAlmostEqual(put.elapsed, 0) 951 952 self.assertRaises(pyqueue.Full, put, 7, False, None) 953 self.assertTimingAlmostEqual(put.elapsed, 0) 954 955 self.assertRaises(pyqueue.Full, put_nowait, 7) 956 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 957 958 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 959 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 960 961 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 962 self.assertTimingAlmostEqual(put.elapsed, 0) 963 964 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 965 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 966 967 child_can_start.set() 968 parent_can_continue.wait() 969 970 self.assertEqual(queue_empty(queue), True) 971 self.assertEqual(queue_full(queue, MAXSIZE), False) 972 973 proc.join() 974 close_queue(queue) 975 976 @classmethod 977 def _test_get(cls, queue, child_can_start, parent_can_continue): 978 child_can_start.wait() 979 #queue.put(1) 980 queue.put(2) 981 queue.put(3) 982 queue.put(4) 983 queue.put(5) 984 parent_can_continue.set() 985 986 def test_get(self): 987 queue = self.Queue() 988 child_can_start = self.Event() 989 parent_can_continue = self.Event() 990 991 proc = self.Process( 992 target=self._test_get, 993 args=(queue, child_can_start, parent_can_continue) 994 ) 995 proc.daemon = True 996 proc.start() 997 998 self.assertEqual(queue_empty(queue), True) 999 1000 child_can_start.set() 1001 parent_can_continue.wait() 1002 1003 time.sleep(DELTA) 1004 self.assertEqual(queue_empty(queue), False) 1005 1006 # Hangs unexpectedly, remove for now 1007 #self.assertEqual(queue.get(), 1) 1008 self.assertEqual(queue.get(True, None), 2) 1009 self.assertEqual(queue.get(True), 3) 1010 self.assertEqual(queue.get(timeout=1), 4) 1011 self.assertEqual(queue.get_nowait(), 5) 1012 1013 self.assertEqual(queue_empty(queue), True) 1014 1015 get = TimingWrapper(queue.get) 1016 get_nowait = TimingWrapper(queue.get_nowait) 1017 1018 self.assertRaises(pyqueue.Empty, get, False) 1019 self.assertTimingAlmostEqual(get.elapsed, 0) 1020 1021 self.assertRaises(pyqueue.Empty, get, False, None) 1022 self.assertTimingAlmostEqual(get.elapsed, 0) 1023 1024 self.assertRaises(pyqueue.Empty, get_nowait) 1025 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1026 1027 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1028 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1029 1030 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1031 self.assertTimingAlmostEqual(get.elapsed, 0) 1032 1033 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1034 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1035 1036 proc.join() 1037 close_queue(queue) 1038 1039 @classmethod 1040 def _test_fork(cls, queue): 1041 for i in range(10, 20): 1042 queue.put(i) 1043 # note that at this point the items may only be buffered, so the 1044 # process cannot shutdown until the feeder thread has finished 1045 # pushing items onto the pipe. 1046 1047 def test_fork(self): 1048 # Old versions of Queue would fail to create a new feeder 1049 # thread for a forked process if the original process had its 1050 # own feeder thread. This test checks that this no longer 1051 # happens. 1052 1053 queue = self.Queue() 1054 1055 # put items on queue so that main process starts a feeder thread 1056 for i in range(10): 1057 queue.put(i) 1058 1059 # wait to make sure thread starts before we fork a new process 1060 time.sleep(DELTA) 1061 1062 # fork process 1063 p = self.Process(target=self._test_fork, args=(queue,)) 1064 p.daemon = True 1065 p.start() 1066 1067 # check that all expected items are in the queue 1068 for i in range(20): 1069 self.assertEqual(queue.get(), i) 1070 self.assertRaises(pyqueue.Empty, queue.get, False) 1071 1072 p.join() 1073 close_queue(queue) 1074 1075 def test_qsize(self): 1076 q = self.Queue() 1077 try: 1078 self.assertEqual(q.qsize(), 0) 1079 except NotImplementedError: 1080 self.skipTest('qsize method not implemented') 1081 q.put(1) 1082 self.assertEqual(q.qsize(), 1) 1083 q.put(5) 1084 self.assertEqual(q.qsize(), 2) 1085 q.get() 1086 self.assertEqual(q.qsize(), 1) 1087 q.get() 1088 self.assertEqual(q.qsize(), 0) 1089 close_queue(q) 1090 1091 @classmethod 1092 def _test_task_done(cls, q): 1093 for obj in iter(q.get, None): 1094 time.sleep(DELTA) 1095 q.task_done() 1096 1097 def test_task_done(self): 1098 queue = self.JoinableQueue() 1099 1100 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1101 for i in range(4)] 1102 1103 for p in workers: 1104 p.daemon = True 1105 p.start() 1106 1107 for i in range(10): 1108 queue.put(i) 1109 1110 queue.join() 1111 1112 for p in workers: 1113 queue.put(None) 1114 1115 for p in workers: 1116 p.join() 1117 close_queue(queue) 1118 1119 def test_no_import_lock_contention(self): 1120 with os_helper.temp_cwd(): 1121 module_name = 'imported_by_an_imported_module' 1122 with open(module_name + '.py', 'w', encoding="utf-8") as f: 1123 f.write("""if 1: 1124 import multiprocessing 1125 1126 q = multiprocessing.Queue() 1127 q.put('knock knock') 1128 q.get(timeout=3) 1129 q.close() 1130 del q 1131 """) 1132 1133 with import_helper.DirsOnSysPath(os.getcwd()): 1134 try: 1135 __import__(module_name) 1136 except pyqueue.Empty: 1137 self.fail("Probable regression on import lock contention;" 1138 " see Issue #22853") 1139 1140 def test_timeout(self): 1141 q = multiprocessing.Queue() 1142 start = time.monotonic() 1143 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1144 delta = time.monotonic() - start 1145 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1146 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1147 # failed because the delta was only 135.8 ms. 1148 self.assertGreaterEqual(delta, 0.100) 1149 close_queue(q) 1150 1151 def test_queue_feeder_donot_stop_onexc(self): 1152 # bpo-30414: verify feeder handles exceptions correctly 1153 if self.TYPE != 'processes': 1154 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1155 1156 class NotSerializable(object): 1157 def __reduce__(self): 1158 raise AttributeError 1159 with test.support.captured_stderr(): 1160 q = self.Queue() 1161 q.put(NotSerializable()) 1162 q.put(True) 1163 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1164 close_queue(q) 1165 1166 with test.support.captured_stderr(): 1167 # bpo-33078: verify that the queue size is correctly handled 1168 # on errors. 1169 q = self.Queue(maxsize=1) 1170 q.put(NotSerializable()) 1171 q.put(True) 1172 try: 1173 self.assertEqual(q.qsize(), 1) 1174 except NotImplementedError: 1175 # qsize is not available on all platform as it 1176 # relies on sem_getvalue 1177 pass 1178 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1179 # Check that the size of the queue is correct 1180 self.assertTrue(q.empty()) 1181 close_queue(q) 1182 1183 def test_queue_feeder_on_queue_feeder_error(self): 1184 # bpo-30006: verify feeder handles exceptions using the 1185 # _on_queue_feeder_error hook. 1186 if self.TYPE != 'processes': 1187 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1188 1189 class NotSerializable(object): 1190 """Mock unserializable object""" 1191 def __init__(self): 1192 self.reduce_was_called = False 1193 self.on_queue_feeder_error_was_called = False 1194 1195 def __reduce__(self): 1196 self.reduce_was_called = True 1197 raise AttributeError 1198 1199 class SafeQueue(multiprocessing.queues.Queue): 1200 """Queue with overloaded _on_queue_feeder_error hook""" 1201 @staticmethod 1202 def _on_queue_feeder_error(e, obj): 1203 if (isinstance(e, AttributeError) and 1204 isinstance(obj, NotSerializable)): 1205 obj.on_queue_feeder_error_was_called = True 1206 1207 not_serializable_obj = NotSerializable() 1208 # The captured_stderr reduces the noise in the test report 1209 with test.support.captured_stderr(): 1210 q = SafeQueue(ctx=multiprocessing.get_context()) 1211 q.put(not_serializable_obj) 1212 1213 # Verify that q is still functioning correctly 1214 q.put(True) 1215 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1216 1217 # Assert that the serialization and the hook have been called correctly 1218 self.assertTrue(not_serializable_obj.reduce_was_called) 1219 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1220 1221 def test_closed_queue_put_get_exceptions(self): 1222 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1223 q.close() 1224 with self.assertRaisesRegex(ValueError, 'is closed'): 1225 q.put('foo') 1226 with self.assertRaisesRegex(ValueError, 'is closed'): 1227 q.get() 1228# 1229# 1230# 1231 1232class _TestLock(BaseTestCase): 1233 1234 def test_lock(self): 1235 lock = self.Lock() 1236 self.assertEqual(lock.acquire(), True) 1237 self.assertEqual(lock.acquire(False), False) 1238 self.assertEqual(lock.release(), None) 1239 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1240 1241 def test_rlock(self): 1242 lock = self.RLock() 1243 self.assertEqual(lock.acquire(), True) 1244 self.assertEqual(lock.acquire(), True) 1245 self.assertEqual(lock.acquire(), True) 1246 self.assertEqual(lock.release(), None) 1247 self.assertEqual(lock.release(), None) 1248 self.assertEqual(lock.release(), None) 1249 self.assertRaises((AssertionError, RuntimeError), lock.release) 1250 1251 def test_lock_context(self): 1252 with self.Lock(): 1253 pass 1254 1255 1256class _TestSemaphore(BaseTestCase): 1257 1258 def _test_semaphore(self, sem): 1259 self.assertReturnsIfImplemented(2, get_value, sem) 1260 self.assertEqual(sem.acquire(), True) 1261 self.assertReturnsIfImplemented(1, get_value, sem) 1262 self.assertEqual(sem.acquire(), True) 1263 self.assertReturnsIfImplemented(0, get_value, sem) 1264 self.assertEqual(sem.acquire(False), False) 1265 self.assertReturnsIfImplemented(0, get_value, sem) 1266 self.assertEqual(sem.release(), None) 1267 self.assertReturnsIfImplemented(1, get_value, sem) 1268 self.assertEqual(sem.release(), None) 1269 self.assertReturnsIfImplemented(2, get_value, sem) 1270 1271 def test_semaphore(self): 1272 sem = self.Semaphore(2) 1273 self._test_semaphore(sem) 1274 self.assertEqual(sem.release(), None) 1275 self.assertReturnsIfImplemented(3, get_value, sem) 1276 self.assertEqual(sem.release(), None) 1277 self.assertReturnsIfImplemented(4, get_value, sem) 1278 1279 def test_bounded_semaphore(self): 1280 sem = self.BoundedSemaphore(2) 1281 self._test_semaphore(sem) 1282 # Currently fails on OS/X 1283 #if HAVE_GETVALUE: 1284 # self.assertRaises(ValueError, sem.release) 1285 # self.assertReturnsIfImplemented(2, get_value, sem) 1286 1287 def test_timeout(self): 1288 if self.TYPE != 'processes': 1289 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1290 1291 sem = self.Semaphore(0) 1292 acquire = TimingWrapper(sem.acquire) 1293 1294 self.assertEqual(acquire(False), False) 1295 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1296 1297 self.assertEqual(acquire(False, None), False) 1298 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1299 1300 self.assertEqual(acquire(False, TIMEOUT1), False) 1301 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1302 1303 self.assertEqual(acquire(True, TIMEOUT2), False) 1304 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1305 1306 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1307 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1308 1309 1310class _TestCondition(BaseTestCase): 1311 1312 @classmethod 1313 def f(cls, cond, sleeping, woken, timeout=None): 1314 cond.acquire() 1315 sleeping.release() 1316 cond.wait(timeout) 1317 woken.release() 1318 cond.release() 1319 1320 def assertReachesEventually(self, func, value): 1321 for i in range(10): 1322 try: 1323 if func() == value: 1324 break 1325 except NotImplementedError: 1326 break 1327 time.sleep(DELTA) 1328 time.sleep(DELTA) 1329 self.assertReturnsIfImplemented(value, func) 1330 1331 def check_invariant(self, cond): 1332 # this is only supposed to succeed when there are no sleepers 1333 if self.TYPE == 'processes': 1334 try: 1335 sleepers = (cond._sleeping_count.get_value() - 1336 cond._woken_count.get_value()) 1337 self.assertEqual(sleepers, 0) 1338 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1339 except NotImplementedError: 1340 pass 1341 1342 def test_notify(self): 1343 cond = self.Condition() 1344 sleeping = self.Semaphore(0) 1345 woken = self.Semaphore(0) 1346 1347 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1348 p.daemon = True 1349 p.start() 1350 self.addCleanup(p.join) 1351 1352 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1353 p.daemon = True 1354 p.start() 1355 self.addCleanup(p.join) 1356 1357 # wait for both children to start sleeping 1358 sleeping.acquire() 1359 sleeping.acquire() 1360 1361 # check no process/thread has woken up 1362 time.sleep(DELTA) 1363 self.assertReturnsIfImplemented(0, get_value, woken) 1364 1365 # wake up one process/thread 1366 cond.acquire() 1367 cond.notify() 1368 cond.release() 1369 1370 # check one process/thread has woken up 1371 time.sleep(DELTA) 1372 self.assertReturnsIfImplemented(1, get_value, woken) 1373 1374 # wake up another 1375 cond.acquire() 1376 cond.notify() 1377 cond.release() 1378 1379 # check other has woken up 1380 time.sleep(DELTA) 1381 self.assertReturnsIfImplemented(2, get_value, woken) 1382 1383 # check state is not mucked up 1384 self.check_invariant(cond) 1385 p.join() 1386 1387 def test_notify_all(self): 1388 cond = self.Condition() 1389 sleeping = self.Semaphore(0) 1390 woken = self.Semaphore(0) 1391 1392 # start some threads/processes which will timeout 1393 for i in range(3): 1394 p = self.Process(target=self.f, 1395 args=(cond, sleeping, woken, TIMEOUT1)) 1396 p.daemon = True 1397 p.start() 1398 self.addCleanup(p.join) 1399 1400 t = threading.Thread(target=self.f, 1401 args=(cond, sleeping, woken, TIMEOUT1)) 1402 t.daemon = True 1403 t.start() 1404 self.addCleanup(t.join) 1405 1406 # wait for them all to sleep 1407 for i in range(6): 1408 sleeping.acquire() 1409 1410 # check they have all timed out 1411 for i in range(6): 1412 woken.acquire() 1413 self.assertReturnsIfImplemented(0, get_value, woken) 1414 1415 # check state is not mucked up 1416 self.check_invariant(cond) 1417 1418 # start some more threads/processes 1419 for i in range(3): 1420 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1421 p.daemon = True 1422 p.start() 1423 self.addCleanup(p.join) 1424 1425 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1426 t.daemon = True 1427 t.start() 1428 self.addCleanup(t.join) 1429 1430 # wait for them to all sleep 1431 for i in range(6): 1432 sleeping.acquire() 1433 1434 # check no process/thread has woken up 1435 time.sleep(DELTA) 1436 self.assertReturnsIfImplemented(0, get_value, woken) 1437 1438 # wake them all up 1439 cond.acquire() 1440 cond.notify_all() 1441 cond.release() 1442 1443 # check they have all woken 1444 self.assertReachesEventually(lambda: get_value(woken), 6) 1445 1446 # check state is not mucked up 1447 self.check_invariant(cond) 1448 1449 def test_notify_n(self): 1450 cond = self.Condition() 1451 sleeping = self.Semaphore(0) 1452 woken = self.Semaphore(0) 1453 1454 # start some threads/processes 1455 for i in range(3): 1456 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1457 p.daemon = True 1458 p.start() 1459 self.addCleanup(p.join) 1460 1461 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1462 t.daemon = True 1463 t.start() 1464 self.addCleanup(t.join) 1465 1466 # wait for them to all sleep 1467 for i in range(6): 1468 sleeping.acquire() 1469 1470 # check no process/thread has woken up 1471 time.sleep(DELTA) 1472 self.assertReturnsIfImplemented(0, get_value, woken) 1473 1474 # wake some of them up 1475 cond.acquire() 1476 cond.notify(n=2) 1477 cond.release() 1478 1479 # check 2 have woken 1480 self.assertReachesEventually(lambda: get_value(woken), 2) 1481 1482 # wake the rest of them 1483 cond.acquire() 1484 cond.notify(n=4) 1485 cond.release() 1486 1487 self.assertReachesEventually(lambda: get_value(woken), 6) 1488 1489 # doesn't do anything more 1490 cond.acquire() 1491 cond.notify(n=3) 1492 cond.release() 1493 1494 self.assertReturnsIfImplemented(6, get_value, woken) 1495 1496 # check state is not mucked up 1497 self.check_invariant(cond) 1498 1499 def test_timeout(self): 1500 cond = self.Condition() 1501 wait = TimingWrapper(cond.wait) 1502 cond.acquire() 1503 res = wait(TIMEOUT1) 1504 cond.release() 1505 self.assertEqual(res, False) 1506 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1507 1508 @classmethod 1509 def _test_waitfor_f(cls, cond, state): 1510 with cond: 1511 state.value = 0 1512 cond.notify() 1513 result = cond.wait_for(lambda : state.value==4) 1514 if not result or state.value != 4: 1515 sys.exit(1) 1516 1517 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1518 def test_waitfor(self): 1519 # based on test in test/lock_tests.py 1520 cond = self.Condition() 1521 state = self.Value('i', -1) 1522 1523 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1524 p.daemon = True 1525 p.start() 1526 1527 with cond: 1528 result = cond.wait_for(lambda : state.value==0) 1529 self.assertTrue(result) 1530 self.assertEqual(state.value, 0) 1531 1532 for i in range(4): 1533 time.sleep(0.01) 1534 with cond: 1535 state.value += 1 1536 cond.notify() 1537 1538 join_process(p) 1539 self.assertEqual(p.exitcode, 0) 1540 1541 @classmethod 1542 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1543 sem.release() 1544 with cond: 1545 expected = 0.1 1546 dt = time.monotonic() 1547 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1548 dt = time.monotonic() - dt 1549 # borrow logic in assertTimeout() from test/lock_tests.py 1550 if not result and expected * 0.6 < dt < expected * 10.0: 1551 success.value = True 1552 1553 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1554 def test_waitfor_timeout(self): 1555 # based on test in test/lock_tests.py 1556 cond = self.Condition() 1557 state = self.Value('i', 0) 1558 success = self.Value('i', False) 1559 sem = self.Semaphore(0) 1560 1561 p = self.Process(target=self._test_waitfor_timeout_f, 1562 args=(cond, state, success, sem)) 1563 p.daemon = True 1564 p.start() 1565 self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT)) 1566 1567 # Only increment 3 times, so state == 4 is never reached. 1568 for i in range(3): 1569 time.sleep(0.01) 1570 with cond: 1571 state.value += 1 1572 cond.notify() 1573 1574 join_process(p) 1575 self.assertTrue(success.value) 1576 1577 @classmethod 1578 def _test_wait_result(cls, c, pid): 1579 with c: 1580 c.notify() 1581 time.sleep(1) 1582 if pid is not None: 1583 os.kill(pid, signal.SIGINT) 1584 1585 def test_wait_result(self): 1586 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1587 pid = os.getpid() 1588 else: 1589 pid = None 1590 1591 c = self.Condition() 1592 with c: 1593 self.assertFalse(c.wait(0)) 1594 self.assertFalse(c.wait(0.1)) 1595 1596 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1597 p.start() 1598 1599 self.assertTrue(c.wait(60)) 1600 if pid is not None: 1601 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1602 1603 p.join() 1604 1605 1606class _TestEvent(BaseTestCase): 1607 1608 @classmethod 1609 def _test_event(cls, event): 1610 time.sleep(TIMEOUT2) 1611 event.set() 1612 1613 def test_event(self): 1614 event = self.Event() 1615 wait = TimingWrapper(event.wait) 1616 1617 # Removed temporarily, due to API shear, this does not 1618 # work with threading._Event objects. is_set == isSet 1619 self.assertEqual(event.is_set(), False) 1620 1621 # Removed, threading.Event.wait() will return the value of the __flag 1622 # instead of None. API Shear with the semaphore backed mp.Event 1623 self.assertEqual(wait(0.0), False) 1624 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1625 self.assertEqual(wait(TIMEOUT1), False) 1626 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1627 1628 event.set() 1629 1630 # See note above on the API differences 1631 self.assertEqual(event.is_set(), True) 1632 self.assertEqual(wait(), True) 1633 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1634 self.assertEqual(wait(TIMEOUT1), True) 1635 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1636 # self.assertEqual(event.is_set(), True) 1637 1638 event.clear() 1639 1640 #self.assertEqual(event.is_set(), False) 1641 1642 p = self.Process(target=self._test_event, args=(event,)) 1643 p.daemon = True 1644 p.start() 1645 self.assertEqual(wait(), True) 1646 p.join() 1647 1648# 1649# Tests for Barrier - adapted from tests in test/lock_tests.py 1650# 1651 1652# Many of the tests for threading.Barrier use a list as an atomic 1653# counter: a value is appended to increment the counter, and the 1654# length of the list gives the value. We use the class DummyList 1655# for the same purpose. 1656 1657class _DummyList(object): 1658 1659 def __init__(self): 1660 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1661 lock = multiprocessing.Lock() 1662 self.__setstate__((wrapper, lock)) 1663 self._lengthbuf[0] = 0 1664 1665 def __setstate__(self, state): 1666 (self._wrapper, self._lock) = state 1667 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1668 1669 def __getstate__(self): 1670 return (self._wrapper, self._lock) 1671 1672 def append(self, _): 1673 with self._lock: 1674 self._lengthbuf[0] += 1 1675 1676 def __len__(self): 1677 with self._lock: 1678 return self._lengthbuf[0] 1679 1680def _wait(): 1681 # A crude wait/yield function not relying on synchronization primitives. 1682 time.sleep(0.01) 1683 1684 1685class Bunch(object): 1686 """ 1687 A bunch of threads. 1688 """ 1689 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1690 """ 1691 Construct a bunch of `n` threads running the same function `f`. 1692 If `wait_before_exit` is True, the threads won't terminate until 1693 do_finish() is called. 1694 """ 1695 self.f = f 1696 self.args = args 1697 self.n = n 1698 self.started = namespace.DummyList() 1699 self.finished = namespace.DummyList() 1700 self._can_exit = namespace.Event() 1701 if not wait_before_exit: 1702 self._can_exit.set() 1703 1704 threads = [] 1705 for i in range(n): 1706 p = namespace.Process(target=self.task) 1707 p.daemon = True 1708 p.start() 1709 threads.append(p) 1710 1711 def finalize(threads): 1712 for p in threads: 1713 p.join() 1714 1715 self._finalizer = weakref.finalize(self, finalize, threads) 1716 1717 def task(self): 1718 pid = os.getpid() 1719 self.started.append(pid) 1720 try: 1721 self.f(*self.args) 1722 finally: 1723 self.finished.append(pid) 1724 self._can_exit.wait(30) 1725 assert self._can_exit.is_set() 1726 1727 def wait_for_started(self): 1728 while len(self.started) < self.n: 1729 _wait() 1730 1731 def wait_for_finished(self): 1732 while len(self.finished) < self.n: 1733 _wait() 1734 1735 def do_finish(self): 1736 self._can_exit.set() 1737 1738 def close(self): 1739 self._finalizer() 1740 1741 1742class AppendTrue(object): 1743 def __init__(self, obj): 1744 self.obj = obj 1745 def __call__(self): 1746 self.obj.append(True) 1747 1748 1749class _TestBarrier(BaseTestCase): 1750 """ 1751 Tests for Barrier objects. 1752 """ 1753 N = 5 1754 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1755 1756 def setUp(self): 1757 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1758 1759 def tearDown(self): 1760 self.barrier.abort() 1761 self.barrier = None 1762 1763 def DummyList(self): 1764 if self.TYPE == 'threads': 1765 return [] 1766 elif self.TYPE == 'manager': 1767 return self.manager.list() 1768 else: 1769 return _DummyList() 1770 1771 def run_threads(self, f, args): 1772 b = Bunch(self, f, args, self.N-1) 1773 try: 1774 f(*args) 1775 b.wait_for_finished() 1776 finally: 1777 b.close() 1778 1779 @classmethod 1780 def multipass(cls, barrier, results, n): 1781 m = barrier.parties 1782 assert m == cls.N 1783 for i in range(n): 1784 results[0].append(True) 1785 assert len(results[1]) == i * m 1786 barrier.wait() 1787 results[1].append(True) 1788 assert len(results[0]) == (i + 1) * m 1789 barrier.wait() 1790 try: 1791 assert barrier.n_waiting == 0 1792 except NotImplementedError: 1793 pass 1794 assert not barrier.broken 1795 1796 def test_barrier(self, passes=1): 1797 """ 1798 Test that a barrier is passed in lockstep 1799 """ 1800 results = [self.DummyList(), self.DummyList()] 1801 self.run_threads(self.multipass, (self.barrier, results, passes)) 1802 1803 def test_barrier_10(self): 1804 """ 1805 Test that a barrier works for 10 consecutive runs 1806 """ 1807 return self.test_barrier(10) 1808 1809 @classmethod 1810 def _test_wait_return_f(cls, barrier, queue): 1811 res = barrier.wait() 1812 queue.put(res) 1813 1814 def test_wait_return(self): 1815 """ 1816 test the return value from barrier.wait 1817 """ 1818 queue = self.Queue() 1819 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1820 results = [queue.get() for i in range(self.N)] 1821 self.assertEqual(results.count(0), 1) 1822 close_queue(queue) 1823 1824 @classmethod 1825 def _test_action_f(cls, barrier, results): 1826 barrier.wait() 1827 if len(results) != 1: 1828 raise RuntimeError 1829 1830 def test_action(self): 1831 """ 1832 Test the 'action' callback 1833 """ 1834 results = self.DummyList() 1835 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1836 self.run_threads(self._test_action_f, (barrier, results)) 1837 self.assertEqual(len(results), 1) 1838 1839 @classmethod 1840 def _test_abort_f(cls, barrier, results1, results2): 1841 try: 1842 i = barrier.wait() 1843 if i == cls.N//2: 1844 raise RuntimeError 1845 barrier.wait() 1846 results1.append(True) 1847 except threading.BrokenBarrierError: 1848 results2.append(True) 1849 except RuntimeError: 1850 barrier.abort() 1851 1852 def test_abort(self): 1853 """ 1854 Test that an abort will put the barrier in a broken state 1855 """ 1856 results1 = self.DummyList() 1857 results2 = self.DummyList() 1858 self.run_threads(self._test_abort_f, 1859 (self.barrier, results1, results2)) 1860 self.assertEqual(len(results1), 0) 1861 self.assertEqual(len(results2), self.N-1) 1862 self.assertTrue(self.barrier.broken) 1863 1864 @classmethod 1865 def _test_reset_f(cls, barrier, results1, results2, results3): 1866 i = barrier.wait() 1867 if i == cls.N//2: 1868 # Wait until the other threads are all in the barrier. 1869 while barrier.n_waiting < cls.N-1: 1870 time.sleep(0.001) 1871 barrier.reset() 1872 else: 1873 try: 1874 barrier.wait() 1875 results1.append(True) 1876 except threading.BrokenBarrierError: 1877 results2.append(True) 1878 # Now, pass the barrier again 1879 barrier.wait() 1880 results3.append(True) 1881 1882 def test_reset(self): 1883 """ 1884 Test that a 'reset' on a barrier frees the waiting threads 1885 """ 1886 results1 = self.DummyList() 1887 results2 = self.DummyList() 1888 results3 = self.DummyList() 1889 self.run_threads(self._test_reset_f, 1890 (self.barrier, results1, results2, results3)) 1891 self.assertEqual(len(results1), 0) 1892 self.assertEqual(len(results2), self.N-1) 1893 self.assertEqual(len(results3), self.N) 1894 1895 @classmethod 1896 def _test_abort_and_reset_f(cls, barrier, barrier2, 1897 results1, results2, results3): 1898 try: 1899 i = barrier.wait() 1900 if i == cls.N//2: 1901 raise RuntimeError 1902 barrier.wait() 1903 results1.append(True) 1904 except threading.BrokenBarrierError: 1905 results2.append(True) 1906 except RuntimeError: 1907 barrier.abort() 1908 # Synchronize and reset the barrier. Must synchronize first so 1909 # that everyone has left it when we reset, and after so that no 1910 # one enters it before the reset. 1911 if barrier2.wait() == cls.N//2: 1912 barrier.reset() 1913 barrier2.wait() 1914 barrier.wait() 1915 results3.append(True) 1916 1917 def test_abort_and_reset(self): 1918 """ 1919 Test that a barrier can be reset after being broken. 1920 """ 1921 results1 = self.DummyList() 1922 results2 = self.DummyList() 1923 results3 = self.DummyList() 1924 barrier2 = self.Barrier(self.N) 1925 1926 self.run_threads(self._test_abort_and_reset_f, 1927 (self.barrier, barrier2, results1, results2, results3)) 1928 self.assertEqual(len(results1), 0) 1929 self.assertEqual(len(results2), self.N-1) 1930 self.assertEqual(len(results3), self.N) 1931 1932 @classmethod 1933 def _test_timeout_f(cls, barrier, results): 1934 i = barrier.wait() 1935 if i == cls.N//2: 1936 # One thread is late! 1937 time.sleep(1.0) 1938 try: 1939 barrier.wait(0.5) 1940 except threading.BrokenBarrierError: 1941 results.append(True) 1942 1943 def test_timeout(self): 1944 """ 1945 Test wait(timeout) 1946 """ 1947 results = self.DummyList() 1948 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1949 self.assertEqual(len(results), self.barrier.parties) 1950 1951 @classmethod 1952 def _test_default_timeout_f(cls, barrier, results): 1953 i = barrier.wait(cls.defaultTimeout) 1954 if i == cls.N//2: 1955 # One thread is later than the default timeout 1956 time.sleep(1.0) 1957 try: 1958 barrier.wait() 1959 except threading.BrokenBarrierError: 1960 results.append(True) 1961 1962 def test_default_timeout(self): 1963 """ 1964 Test the barrier's default timeout 1965 """ 1966 barrier = self.Barrier(self.N, timeout=0.5) 1967 results = self.DummyList() 1968 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1969 self.assertEqual(len(results), barrier.parties) 1970 1971 def test_single_thread(self): 1972 b = self.Barrier(1) 1973 b.wait() 1974 b.wait() 1975 1976 @classmethod 1977 def _test_thousand_f(cls, barrier, passes, conn, lock): 1978 for i in range(passes): 1979 barrier.wait() 1980 with lock: 1981 conn.send(i) 1982 1983 def test_thousand(self): 1984 if self.TYPE == 'manager': 1985 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1986 passes = 1000 1987 lock = self.Lock() 1988 conn, child_conn = self.Pipe(False) 1989 for j in range(self.N): 1990 p = self.Process(target=self._test_thousand_f, 1991 args=(self.barrier, passes, child_conn, lock)) 1992 p.start() 1993 self.addCleanup(p.join) 1994 1995 for i in range(passes): 1996 for j in range(self.N): 1997 self.assertEqual(conn.recv(), i) 1998 1999# 2000# 2001# 2002 2003class _TestValue(BaseTestCase): 2004 2005 ALLOWED_TYPES = ('processes',) 2006 2007 codes_values = [ 2008 ('i', 4343, 24234), 2009 ('d', 3.625, -4.25), 2010 ('h', -232, 234), 2011 ('q', 2 ** 33, 2 ** 34), 2012 ('c', latin('x'), latin('y')) 2013 ] 2014 2015 def setUp(self): 2016 if not HAS_SHAREDCTYPES: 2017 self.skipTest("requires multiprocessing.sharedctypes") 2018 2019 @classmethod 2020 def _test(cls, values): 2021 for sv, cv in zip(values, cls.codes_values): 2022 sv.value = cv[2] 2023 2024 2025 def test_value(self, raw=False): 2026 if raw: 2027 values = [self.RawValue(code, value) 2028 for code, value, _ in self.codes_values] 2029 else: 2030 values = [self.Value(code, value) 2031 for code, value, _ in self.codes_values] 2032 2033 for sv, cv in zip(values, self.codes_values): 2034 self.assertEqual(sv.value, cv[1]) 2035 2036 proc = self.Process(target=self._test, args=(values,)) 2037 proc.daemon = True 2038 proc.start() 2039 proc.join() 2040 2041 for sv, cv in zip(values, self.codes_values): 2042 self.assertEqual(sv.value, cv[2]) 2043 2044 def test_rawvalue(self): 2045 self.test_value(raw=True) 2046 2047 def test_getobj_getlock(self): 2048 val1 = self.Value('i', 5) 2049 lock1 = val1.get_lock() 2050 obj1 = val1.get_obj() 2051 2052 val2 = self.Value('i', 5, lock=None) 2053 lock2 = val2.get_lock() 2054 obj2 = val2.get_obj() 2055 2056 lock = self.Lock() 2057 val3 = self.Value('i', 5, lock=lock) 2058 lock3 = val3.get_lock() 2059 obj3 = val3.get_obj() 2060 self.assertEqual(lock, lock3) 2061 2062 arr4 = self.Value('i', 5, lock=False) 2063 self.assertFalse(hasattr(arr4, 'get_lock')) 2064 self.assertFalse(hasattr(arr4, 'get_obj')) 2065 2066 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2067 2068 arr5 = self.RawValue('i', 5) 2069 self.assertFalse(hasattr(arr5, 'get_lock')) 2070 self.assertFalse(hasattr(arr5, 'get_obj')) 2071 2072 2073class _TestArray(BaseTestCase): 2074 2075 ALLOWED_TYPES = ('processes',) 2076 2077 @classmethod 2078 def f(cls, seq): 2079 for i in range(1, len(seq)): 2080 seq[i] += seq[i-1] 2081 2082 @unittest.skipIf(c_int is None, "requires _ctypes") 2083 def test_array(self, raw=False): 2084 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2085 if raw: 2086 arr = self.RawArray('i', seq) 2087 else: 2088 arr = self.Array('i', seq) 2089 2090 self.assertEqual(len(arr), len(seq)) 2091 self.assertEqual(arr[3], seq[3]) 2092 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2093 2094 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2095 2096 self.assertEqual(list(arr[:]), seq) 2097 2098 self.f(seq) 2099 2100 p = self.Process(target=self.f, args=(arr,)) 2101 p.daemon = True 2102 p.start() 2103 p.join() 2104 2105 self.assertEqual(list(arr[:]), seq) 2106 2107 @unittest.skipIf(c_int is None, "requires _ctypes") 2108 def test_array_from_size(self): 2109 size = 10 2110 # Test for zeroing (see issue #11675). 2111 # The repetition below strengthens the test by increasing the chances 2112 # of previously allocated non-zero memory being used for the new array 2113 # on the 2nd and 3rd loops. 2114 for _ in range(3): 2115 arr = self.Array('i', size) 2116 self.assertEqual(len(arr), size) 2117 self.assertEqual(list(arr), [0] * size) 2118 arr[:] = range(10) 2119 self.assertEqual(list(arr), list(range(10))) 2120 del arr 2121 2122 @unittest.skipIf(c_int is None, "requires _ctypes") 2123 def test_rawarray(self): 2124 self.test_array(raw=True) 2125 2126 @unittest.skipIf(c_int is None, "requires _ctypes") 2127 def test_getobj_getlock_obj(self): 2128 arr1 = self.Array('i', list(range(10))) 2129 lock1 = arr1.get_lock() 2130 obj1 = arr1.get_obj() 2131 2132 arr2 = self.Array('i', list(range(10)), lock=None) 2133 lock2 = arr2.get_lock() 2134 obj2 = arr2.get_obj() 2135 2136 lock = self.Lock() 2137 arr3 = self.Array('i', list(range(10)), lock=lock) 2138 lock3 = arr3.get_lock() 2139 obj3 = arr3.get_obj() 2140 self.assertEqual(lock, lock3) 2141 2142 arr4 = self.Array('i', range(10), lock=False) 2143 self.assertFalse(hasattr(arr4, 'get_lock')) 2144 self.assertFalse(hasattr(arr4, 'get_obj')) 2145 self.assertRaises(AttributeError, 2146 self.Array, 'i', range(10), lock='notalock') 2147 2148 arr5 = self.RawArray('i', range(10)) 2149 self.assertFalse(hasattr(arr5, 'get_lock')) 2150 self.assertFalse(hasattr(arr5, 'get_obj')) 2151 2152# 2153# 2154# 2155 2156class _TestContainers(BaseTestCase): 2157 2158 ALLOWED_TYPES = ('manager',) 2159 2160 def test_list(self): 2161 a = self.list(list(range(10))) 2162 self.assertEqual(a[:], list(range(10))) 2163 2164 b = self.list() 2165 self.assertEqual(b[:], []) 2166 2167 b.extend(list(range(5))) 2168 self.assertEqual(b[:], list(range(5))) 2169 2170 self.assertEqual(b[2], 2) 2171 self.assertEqual(b[2:10], [2,3,4]) 2172 2173 b *= 2 2174 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2175 2176 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2177 2178 self.assertEqual(a[:], list(range(10))) 2179 2180 d = [a, b] 2181 e = self.list(d) 2182 self.assertEqual( 2183 [element[:] for element in e], 2184 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2185 ) 2186 2187 f = self.list([a]) 2188 a.append('hello') 2189 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2190 2191 def test_list_iter(self): 2192 a = self.list(list(range(10))) 2193 it = iter(a) 2194 self.assertEqual(list(it), list(range(10))) 2195 self.assertEqual(list(it), []) # exhausted 2196 # list modified during iteration 2197 it = iter(a) 2198 a[0] = 100 2199 self.assertEqual(next(it), 100) 2200 2201 def test_list_proxy_in_list(self): 2202 a = self.list([self.list(range(3)) for _i in range(3)]) 2203 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2204 2205 a[0][-1] = 55 2206 self.assertEqual(a[0][:], [0, 1, 55]) 2207 for i in range(1, 3): 2208 self.assertEqual(a[i][:], [0, 1, 2]) 2209 2210 self.assertEqual(a[1].pop(), 2) 2211 self.assertEqual(len(a[1]), 2) 2212 for i in range(0, 3, 2): 2213 self.assertEqual(len(a[i]), 3) 2214 2215 del a 2216 2217 b = self.list() 2218 b.append(b) 2219 del b 2220 2221 def test_dict(self): 2222 d = self.dict() 2223 indices = list(range(65, 70)) 2224 for i in indices: 2225 d[i] = chr(i) 2226 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2227 self.assertEqual(sorted(d.keys()), indices) 2228 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2229 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2230 2231 def test_dict_iter(self): 2232 d = self.dict() 2233 indices = list(range(65, 70)) 2234 for i in indices: 2235 d[i] = chr(i) 2236 it = iter(d) 2237 self.assertEqual(list(it), indices) 2238 self.assertEqual(list(it), []) # exhausted 2239 # dictionary changed size during iteration 2240 it = iter(d) 2241 d.clear() 2242 self.assertRaises(RuntimeError, next, it) 2243 2244 def test_dict_proxy_nested(self): 2245 pets = self.dict(ferrets=2, hamsters=4) 2246 supplies = self.dict(water=10, feed=3) 2247 d = self.dict(pets=pets, supplies=supplies) 2248 2249 self.assertEqual(supplies['water'], 10) 2250 self.assertEqual(d['supplies']['water'], 10) 2251 2252 d['supplies']['blankets'] = 5 2253 self.assertEqual(supplies['blankets'], 5) 2254 self.assertEqual(d['supplies']['blankets'], 5) 2255 2256 d['supplies']['water'] = 7 2257 self.assertEqual(supplies['water'], 7) 2258 self.assertEqual(d['supplies']['water'], 7) 2259 2260 del pets 2261 del supplies 2262 self.assertEqual(d['pets']['ferrets'], 2) 2263 d['supplies']['blankets'] = 11 2264 self.assertEqual(d['supplies']['blankets'], 11) 2265 2266 pets = d['pets'] 2267 supplies = d['supplies'] 2268 supplies['water'] = 7 2269 self.assertEqual(supplies['water'], 7) 2270 self.assertEqual(d['supplies']['water'], 7) 2271 2272 d.clear() 2273 self.assertEqual(len(d), 0) 2274 self.assertEqual(supplies['water'], 7) 2275 self.assertEqual(pets['hamsters'], 4) 2276 2277 l = self.list([pets, supplies]) 2278 l[0]['marmots'] = 1 2279 self.assertEqual(pets['marmots'], 1) 2280 self.assertEqual(l[0]['marmots'], 1) 2281 2282 del pets 2283 del supplies 2284 self.assertEqual(l[0]['marmots'], 1) 2285 2286 outer = self.list([[88, 99], l]) 2287 self.assertIsInstance(outer[0], list) # Not a ListProxy 2288 self.assertEqual(outer[-1][-1]['feed'], 3) 2289 2290 def test_nested_queue(self): 2291 a = self.list() # Test queue inside list 2292 a.append(self.Queue()) 2293 a[0].put(123) 2294 self.assertEqual(a[0].get(), 123) 2295 b = self.dict() # Test queue inside dict 2296 b[0] = self.Queue() 2297 b[0].put(456) 2298 self.assertEqual(b[0].get(), 456) 2299 2300 def test_namespace(self): 2301 n = self.Namespace() 2302 n.name = 'Bob' 2303 n.job = 'Builder' 2304 n._hidden = 'hidden' 2305 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2306 del n.job 2307 self.assertEqual(str(n), "Namespace(name='Bob')") 2308 self.assertTrue(hasattr(n, 'name')) 2309 self.assertTrue(not hasattr(n, 'job')) 2310 2311# 2312# 2313# 2314 2315def sqr(x, wait=0.0): 2316 time.sleep(wait) 2317 return x*x 2318 2319def mul(x, y): 2320 return x*y 2321 2322def raise_large_valuerror(wait): 2323 time.sleep(wait) 2324 raise ValueError("x" * 1024**2) 2325 2326def identity(x): 2327 return x 2328 2329class CountedObject(object): 2330 n_instances = 0 2331 2332 def __new__(cls): 2333 cls.n_instances += 1 2334 return object.__new__(cls) 2335 2336 def __del__(self): 2337 type(self).n_instances -= 1 2338 2339class SayWhenError(ValueError): pass 2340 2341def exception_throwing_generator(total, when): 2342 if when == -1: 2343 raise SayWhenError("Somebody said when") 2344 for i in range(total): 2345 if i == when: 2346 raise SayWhenError("Somebody said when") 2347 yield i 2348 2349 2350class _TestPool(BaseTestCase): 2351 2352 @classmethod 2353 def setUpClass(cls): 2354 super().setUpClass() 2355 cls.pool = cls.Pool(4) 2356 2357 @classmethod 2358 def tearDownClass(cls): 2359 cls.pool.terminate() 2360 cls.pool.join() 2361 cls.pool = None 2362 super().tearDownClass() 2363 2364 def test_apply(self): 2365 papply = self.pool.apply 2366 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2367 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2368 2369 def test_map(self): 2370 pmap = self.pool.map 2371 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2372 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2373 list(map(sqr, list(range(100))))) 2374 2375 def test_starmap(self): 2376 psmap = self.pool.starmap 2377 tuples = list(zip(range(10), range(9,-1, -1))) 2378 self.assertEqual(psmap(mul, tuples), 2379 list(itertools.starmap(mul, tuples))) 2380 tuples = list(zip(range(100), range(99,-1, -1))) 2381 self.assertEqual(psmap(mul, tuples, chunksize=20), 2382 list(itertools.starmap(mul, tuples))) 2383 2384 def test_starmap_async(self): 2385 tuples = list(zip(range(100), range(99,-1, -1))) 2386 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2387 list(itertools.starmap(mul, tuples))) 2388 2389 def test_map_async(self): 2390 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2391 list(map(sqr, list(range(10))))) 2392 2393 def test_map_async_callbacks(self): 2394 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2395 self.pool.map_async(int, ['1'], 2396 callback=call_args.append, 2397 error_callback=call_args.append).wait() 2398 self.assertEqual(1, len(call_args)) 2399 self.assertEqual([1], call_args[0]) 2400 self.pool.map_async(int, ['a'], 2401 callback=call_args.append, 2402 error_callback=call_args.append).wait() 2403 self.assertEqual(2, len(call_args)) 2404 self.assertIsInstance(call_args[1], ValueError) 2405 2406 def test_map_unplicklable(self): 2407 # Issue #19425 -- failure to pickle should not cause a hang 2408 if self.TYPE == 'threads': 2409 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2410 class A(object): 2411 def __reduce__(self): 2412 raise RuntimeError('cannot pickle') 2413 with self.assertRaises(RuntimeError): 2414 self.pool.map(sqr, [A()]*10) 2415 2416 def test_map_chunksize(self): 2417 try: 2418 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2419 except multiprocessing.TimeoutError: 2420 self.fail("pool.map_async with chunksize stalled on null list") 2421 2422 def test_map_handle_iterable_exception(self): 2423 if self.TYPE == 'manager': 2424 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2425 2426 # SayWhenError seen at the very first of the iterable 2427 with self.assertRaises(SayWhenError): 2428 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2429 # again, make sure it's reentrant 2430 with self.assertRaises(SayWhenError): 2431 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2432 2433 with self.assertRaises(SayWhenError): 2434 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2435 2436 class SpecialIterable: 2437 def __iter__(self): 2438 return self 2439 def __next__(self): 2440 raise SayWhenError 2441 def __len__(self): 2442 return 1 2443 with self.assertRaises(SayWhenError): 2444 self.pool.map(sqr, SpecialIterable(), 1) 2445 with self.assertRaises(SayWhenError): 2446 self.pool.map(sqr, SpecialIterable(), 1) 2447 2448 def test_async(self): 2449 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2450 get = TimingWrapper(res.get) 2451 self.assertEqual(get(), 49) 2452 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2453 2454 def test_async_timeout(self): 2455 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2456 get = TimingWrapper(res.get) 2457 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2458 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2459 2460 def test_imap(self): 2461 it = self.pool.imap(sqr, list(range(10))) 2462 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2463 2464 it = self.pool.imap(sqr, list(range(10))) 2465 for i in range(10): 2466 self.assertEqual(next(it), i*i) 2467 self.assertRaises(StopIteration, it.__next__) 2468 2469 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2470 for i in range(1000): 2471 self.assertEqual(next(it), i*i) 2472 self.assertRaises(StopIteration, it.__next__) 2473 2474 def test_imap_handle_iterable_exception(self): 2475 if self.TYPE == 'manager': 2476 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2477 2478 # SayWhenError seen at the very first of the iterable 2479 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2480 self.assertRaises(SayWhenError, it.__next__) 2481 # again, make sure it's reentrant 2482 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2483 self.assertRaises(SayWhenError, it.__next__) 2484 2485 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2486 for i in range(3): 2487 self.assertEqual(next(it), i*i) 2488 self.assertRaises(SayWhenError, it.__next__) 2489 2490 # SayWhenError seen at start of problematic chunk's results 2491 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2492 for i in range(6): 2493 self.assertEqual(next(it), i*i) 2494 self.assertRaises(SayWhenError, it.__next__) 2495 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2496 for i in range(4): 2497 self.assertEqual(next(it), i*i) 2498 self.assertRaises(SayWhenError, it.__next__) 2499 2500 def test_imap_unordered(self): 2501 it = self.pool.imap_unordered(sqr, list(range(10))) 2502 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2503 2504 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2505 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2506 2507 def test_imap_unordered_handle_iterable_exception(self): 2508 if self.TYPE == 'manager': 2509 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2510 2511 # SayWhenError seen at the very first of the iterable 2512 it = self.pool.imap_unordered(sqr, 2513 exception_throwing_generator(1, -1), 2514 1) 2515 self.assertRaises(SayWhenError, it.__next__) 2516 # again, make sure it's reentrant 2517 it = self.pool.imap_unordered(sqr, 2518 exception_throwing_generator(1, -1), 2519 1) 2520 self.assertRaises(SayWhenError, it.__next__) 2521 2522 it = self.pool.imap_unordered(sqr, 2523 exception_throwing_generator(10, 3), 2524 1) 2525 expected_values = list(map(sqr, list(range(10)))) 2526 with self.assertRaises(SayWhenError): 2527 # imap_unordered makes it difficult to anticipate the SayWhenError 2528 for i in range(10): 2529 value = next(it) 2530 self.assertIn(value, expected_values) 2531 expected_values.remove(value) 2532 2533 it = self.pool.imap_unordered(sqr, 2534 exception_throwing_generator(20, 7), 2535 2) 2536 expected_values = list(map(sqr, list(range(20)))) 2537 with self.assertRaises(SayWhenError): 2538 for i in range(20): 2539 value = next(it) 2540 self.assertIn(value, expected_values) 2541 expected_values.remove(value) 2542 2543 def test_make_pool(self): 2544 expected_error = (RemoteError if self.TYPE == 'manager' 2545 else ValueError) 2546 2547 self.assertRaises(expected_error, self.Pool, -1) 2548 self.assertRaises(expected_error, self.Pool, 0) 2549 2550 if self.TYPE != 'manager': 2551 p = self.Pool(3) 2552 try: 2553 self.assertEqual(3, len(p._pool)) 2554 finally: 2555 p.close() 2556 p.join() 2557 2558 def test_terminate(self): 2559 result = self.pool.map_async( 2560 time.sleep, [0.1 for i in range(10000)], chunksize=1 2561 ) 2562 self.pool.terminate() 2563 join = TimingWrapper(self.pool.join) 2564 join() 2565 # Sanity check the pool didn't wait for all tasks to finish 2566 self.assertLess(join.elapsed, 2.0) 2567 2568 def test_empty_iterable(self): 2569 # See Issue 12157 2570 p = self.Pool(1) 2571 2572 self.assertEqual(p.map(sqr, []), []) 2573 self.assertEqual(list(p.imap(sqr, [])), []) 2574 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2575 self.assertEqual(p.map_async(sqr, []).get(), []) 2576 2577 p.close() 2578 p.join() 2579 2580 def test_context(self): 2581 if self.TYPE == 'processes': 2582 L = list(range(10)) 2583 expected = [sqr(i) for i in L] 2584 with self.Pool(2) as p: 2585 r = p.map_async(sqr, L) 2586 self.assertEqual(r.get(), expected) 2587 p.join() 2588 self.assertRaises(ValueError, p.map_async, sqr, L) 2589 2590 @classmethod 2591 def _test_traceback(cls): 2592 raise RuntimeError(123) # some comment 2593 2594 def test_traceback(self): 2595 # We want ensure that the traceback from the child process is 2596 # contained in the traceback raised in the main process. 2597 if self.TYPE == 'processes': 2598 with self.Pool(1) as p: 2599 try: 2600 p.apply(self._test_traceback) 2601 except Exception as e: 2602 exc = e 2603 else: 2604 self.fail('expected RuntimeError') 2605 p.join() 2606 self.assertIs(type(exc), RuntimeError) 2607 self.assertEqual(exc.args, (123,)) 2608 cause = exc.__cause__ 2609 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2610 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2611 2612 with test.support.captured_stderr() as f1: 2613 try: 2614 raise exc 2615 except RuntimeError: 2616 sys.excepthook(*sys.exc_info()) 2617 self.assertIn('raise RuntimeError(123) # some comment', 2618 f1.getvalue()) 2619 # _helper_reraises_exception should not make the error 2620 # a remote exception 2621 with self.Pool(1) as p: 2622 try: 2623 p.map(sqr, exception_throwing_generator(1, -1), 1) 2624 except Exception as e: 2625 exc = e 2626 else: 2627 self.fail('expected SayWhenError') 2628 self.assertIs(type(exc), SayWhenError) 2629 self.assertIs(exc.__cause__, None) 2630 p.join() 2631 2632 @classmethod 2633 def _test_wrapped_exception(cls): 2634 raise RuntimeError('foo') 2635 2636 def test_wrapped_exception(self): 2637 # Issue #20980: Should not wrap exception when using thread pool 2638 with self.Pool(1) as p: 2639 with self.assertRaises(RuntimeError): 2640 p.apply(self._test_wrapped_exception) 2641 p.join() 2642 2643 def test_map_no_failfast(self): 2644 # Issue #23992: the fail-fast behaviour when an exception is raised 2645 # during map() would make Pool.join() deadlock, because a worker 2646 # process would fill the result queue (after the result handler thread 2647 # terminated, hence not draining it anymore). 2648 2649 t_start = time.monotonic() 2650 2651 with self.assertRaises(ValueError): 2652 with self.Pool(2) as p: 2653 try: 2654 p.map(raise_large_valuerror, [0, 1]) 2655 finally: 2656 time.sleep(0.5) 2657 p.close() 2658 p.join() 2659 2660 # check that we indeed waited for all jobs 2661 self.assertGreater(time.monotonic() - t_start, 0.9) 2662 2663 def test_release_task_refs(self): 2664 # Issue #29861: task arguments and results should not be kept 2665 # alive after we are done with them. 2666 objs = [CountedObject() for i in range(10)] 2667 refs = [weakref.ref(o) for o in objs] 2668 self.pool.map(identity, objs) 2669 2670 del objs 2671 gc.collect() # For PyPy or other GCs. 2672 time.sleep(DELTA) # let threaded cleanup code run 2673 self.assertEqual(set(wr() for wr in refs), {None}) 2674 # With a process pool, copies of the objects are returned, check 2675 # they were released too. 2676 self.assertEqual(CountedObject.n_instances, 0) 2677 2678 def test_enter(self): 2679 if self.TYPE == 'manager': 2680 self.skipTest("test not applicable to manager") 2681 2682 pool = self.Pool(1) 2683 with pool: 2684 pass 2685 # call pool.terminate() 2686 # pool is no longer running 2687 2688 with self.assertRaises(ValueError): 2689 # bpo-35477: pool.__enter__() fails if the pool is not running 2690 with pool: 2691 pass 2692 pool.join() 2693 2694 def test_resource_warning(self): 2695 if self.TYPE == 'manager': 2696 self.skipTest("test not applicable to manager") 2697 2698 pool = self.Pool(1) 2699 pool.terminate() 2700 pool.join() 2701 2702 # force state to RUN to emit ResourceWarning in __del__() 2703 pool._state = multiprocessing.pool.RUN 2704 2705 with warnings_helper.check_warnings( 2706 ('unclosed running multiprocessing pool', ResourceWarning)): 2707 pool = None 2708 support.gc_collect() 2709 2710def raising(): 2711 raise KeyError("key") 2712 2713def unpickleable_result(): 2714 return lambda: 42 2715 2716class _TestPoolWorkerErrors(BaseTestCase): 2717 ALLOWED_TYPES = ('processes', ) 2718 2719 def test_async_error_callback(self): 2720 p = multiprocessing.Pool(2) 2721 2722 scratchpad = [None] 2723 def errback(exc): 2724 scratchpad[0] = exc 2725 2726 res = p.apply_async(raising, error_callback=errback) 2727 self.assertRaises(KeyError, res.get) 2728 self.assertTrue(scratchpad[0]) 2729 self.assertIsInstance(scratchpad[0], KeyError) 2730 2731 p.close() 2732 p.join() 2733 2734 def test_unpickleable_result(self): 2735 from multiprocessing.pool import MaybeEncodingError 2736 p = multiprocessing.Pool(2) 2737 2738 # Make sure we don't lose pool processes because of encoding errors. 2739 for iteration in range(20): 2740 2741 scratchpad = [None] 2742 def errback(exc): 2743 scratchpad[0] = exc 2744 2745 res = p.apply_async(unpickleable_result, error_callback=errback) 2746 self.assertRaises(MaybeEncodingError, res.get) 2747 wrapped = scratchpad[0] 2748 self.assertTrue(wrapped) 2749 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2750 self.assertIsNotNone(wrapped.exc) 2751 self.assertIsNotNone(wrapped.value) 2752 2753 p.close() 2754 p.join() 2755 2756class _TestPoolWorkerLifetime(BaseTestCase): 2757 ALLOWED_TYPES = ('processes', ) 2758 2759 def test_pool_worker_lifetime(self): 2760 p = multiprocessing.Pool(3, maxtasksperchild=10) 2761 self.assertEqual(3, len(p._pool)) 2762 origworkerpids = [w.pid for w in p._pool] 2763 # Run many tasks so each worker gets replaced (hopefully) 2764 results = [] 2765 for i in range(100): 2766 results.append(p.apply_async(sqr, (i, ))) 2767 # Fetch the results and verify we got the right answers, 2768 # also ensuring all the tasks have completed. 2769 for (j, res) in enumerate(results): 2770 self.assertEqual(res.get(), sqr(j)) 2771 # Refill the pool 2772 p._repopulate_pool() 2773 # Wait until all workers are alive 2774 # (countdown * DELTA = 5 seconds max startup process time) 2775 countdown = 50 2776 while countdown and not all(w.is_alive() for w in p._pool): 2777 countdown -= 1 2778 time.sleep(DELTA) 2779 finalworkerpids = [w.pid for w in p._pool] 2780 # All pids should be assigned. See issue #7805. 2781 self.assertNotIn(None, origworkerpids) 2782 self.assertNotIn(None, finalworkerpids) 2783 # Finally, check that the worker pids have changed 2784 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2785 p.close() 2786 p.join() 2787 2788 def test_pool_worker_lifetime_early_close(self): 2789 # Issue #10332: closing a pool whose workers have limited lifetimes 2790 # before all the tasks completed would make join() hang. 2791 p = multiprocessing.Pool(3, maxtasksperchild=1) 2792 results = [] 2793 for i in range(6): 2794 results.append(p.apply_async(sqr, (i, 0.3))) 2795 p.close() 2796 p.join() 2797 # check the results 2798 for (j, res) in enumerate(results): 2799 self.assertEqual(res.get(), sqr(j)) 2800 2801 def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): 2802 # tests cases against bpo-38744 and bpo-39360 2803 cmd = '''if 1: 2804 from multiprocessing import Pool 2805 problem = None 2806 class A: 2807 def __init__(self): 2808 self.pool = Pool(processes=1) 2809 def test(): 2810 global problem 2811 problem = A() 2812 problem.pool.map(float, tuple(range(10))) 2813 if __name__ == "__main__": 2814 test() 2815 ''' 2816 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 2817 self.assertEqual(rc, 0) 2818 2819# 2820# Test of creating a customized manager class 2821# 2822 2823from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2824 2825class FooBar(object): 2826 def f(self): 2827 return 'f()' 2828 def g(self): 2829 raise ValueError 2830 def _h(self): 2831 return '_h()' 2832 2833def baz(): 2834 for i in range(10): 2835 yield i*i 2836 2837class IteratorProxy(BaseProxy): 2838 _exposed_ = ('__next__',) 2839 def __iter__(self): 2840 return self 2841 def __next__(self): 2842 return self._callmethod('__next__') 2843 2844class MyManager(BaseManager): 2845 pass 2846 2847MyManager.register('Foo', callable=FooBar) 2848MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2849MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2850 2851 2852class _TestMyManager(BaseTestCase): 2853 2854 ALLOWED_TYPES = ('manager',) 2855 2856 def test_mymanager(self): 2857 manager = MyManager() 2858 manager.start() 2859 self.common(manager) 2860 manager.shutdown() 2861 2862 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2863 # to the manager process if it takes longer than 1 second to stop, 2864 # which happens on slow buildbots. 2865 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2866 2867 def test_mymanager_context(self): 2868 with MyManager() as manager: 2869 self.common(manager) 2870 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2871 # to the manager process if it takes longer than 1 second to stop, 2872 # which happens on slow buildbots. 2873 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2874 2875 def test_mymanager_context_prestarted(self): 2876 manager = MyManager() 2877 manager.start() 2878 with manager: 2879 self.common(manager) 2880 self.assertEqual(manager._process.exitcode, 0) 2881 2882 def common(self, manager): 2883 foo = manager.Foo() 2884 bar = manager.Bar() 2885 baz = manager.baz() 2886 2887 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2888 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2889 2890 self.assertEqual(foo_methods, ['f', 'g']) 2891 self.assertEqual(bar_methods, ['f', '_h']) 2892 2893 self.assertEqual(foo.f(), 'f()') 2894 self.assertRaises(ValueError, foo.g) 2895 self.assertEqual(foo._callmethod('f'), 'f()') 2896 self.assertRaises(RemoteError, foo._callmethod, '_h') 2897 2898 self.assertEqual(bar.f(), 'f()') 2899 self.assertEqual(bar._h(), '_h()') 2900 self.assertEqual(bar._callmethod('f'), 'f()') 2901 self.assertEqual(bar._callmethod('_h'), '_h()') 2902 2903 self.assertEqual(list(baz), [i*i for i in range(10)]) 2904 2905 2906# 2907# Test of connecting to a remote server and using xmlrpclib for serialization 2908# 2909 2910_queue = pyqueue.Queue() 2911def get_queue(): 2912 return _queue 2913 2914class QueueManager(BaseManager): 2915 '''manager class used by server process''' 2916QueueManager.register('get_queue', callable=get_queue) 2917 2918class QueueManager2(BaseManager): 2919 '''manager class which specifies the same interface as QueueManager''' 2920QueueManager2.register('get_queue') 2921 2922 2923SERIALIZER = 'xmlrpclib' 2924 2925class _TestRemoteManager(BaseTestCase): 2926 2927 ALLOWED_TYPES = ('manager',) 2928 values = ['hello world', None, True, 2.25, 2929 'hall\xe5 v\xe4rlden', 2930 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2931 b'hall\xe5 v\xe4rlden', 2932 ] 2933 result = values[:] 2934 2935 @classmethod 2936 def _putter(cls, address, authkey): 2937 manager = QueueManager2( 2938 address=address, authkey=authkey, serializer=SERIALIZER 2939 ) 2940 manager.connect() 2941 queue = manager.get_queue() 2942 # Note that xmlrpclib will deserialize object as a list not a tuple 2943 queue.put(tuple(cls.values)) 2944 2945 def test_remote(self): 2946 authkey = os.urandom(32) 2947 2948 manager = QueueManager( 2949 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER 2950 ) 2951 manager.start() 2952 self.addCleanup(manager.shutdown) 2953 2954 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2955 p.daemon = True 2956 p.start() 2957 2958 manager2 = QueueManager2( 2959 address=manager.address, authkey=authkey, serializer=SERIALIZER 2960 ) 2961 manager2.connect() 2962 queue = manager2.get_queue() 2963 2964 self.assertEqual(queue.get(), self.result) 2965 2966 # Because we are using xmlrpclib for serialization instead of 2967 # pickle this will cause a serialization error. 2968 self.assertRaises(Exception, queue.put, time.sleep) 2969 2970 # Make queue finalizer run before the server is stopped 2971 del queue 2972 2973 2974@hashlib_helper.requires_hashdigest('md5') 2975class _TestManagerRestart(BaseTestCase): 2976 2977 @classmethod 2978 def _putter(cls, address, authkey): 2979 manager = QueueManager( 2980 address=address, authkey=authkey, serializer=SERIALIZER) 2981 manager.connect() 2982 queue = manager.get_queue() 2983 queue.put('hello world') 2984 2985 def test_rapid_restart(self): 2986 authkey = os.urandom(32) 2987 manager = QueueManager( 2988 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2989 try: 2990 srvr = manager.get_server() 2991 addr = srvr.address 2992 # Close the connection.Listener socket which gets opened as a part 2993 # of manager.get_server(). It's not needed for the test. 2994 srvr.listener.close() 2995 manager.start() 2996 2997 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2998 p.start() 2999 p.join() 3000 queue = manager.get_queue() 3001 self.assertEqual(queue.get(), 'hello world') 3002 del queue 3003 finally: 3004 if hasattr(manager, "shutdown"): 3005 manager.shutdown() 3006 3007 manager = QueueManager( 3008 address=addr, authkey=authkey, serializer=SERIALIZER) 3009 try: 3010 manager.start() 3011 self.addCleanup(manager.shutdown) 3012 except OSError as e: 3013 if e.errno != errno.EADDRINUSE: 3014 raise 3015 # Retry after some time, in case the old socket was lingering 3016 # (sporadic failure on buildbots) 3017 time.sleep(1.0) 3018 manager = QueueManager( 3019 address=addr, authkey=authkey, serializer=SERIALIZER) 3020 if hasattr(manager, "shutdown"): 3021 self.addCleanup(manager.shutdown) 3022 3023# 3024# 3025# 3026 3027SENTINEL = latin('') 3028 3029class _TestConnection(BaseTestCase): 3030 3031 ALLOWED_TYPES = ('processes', 'threads') 3032 3033 @classmethod 3034 def _echo(cls, conn): 3035 for msg in iter(conn.recv_bytes, SENTINEL): 3036 conn.send_bytes(msg) 3037 conn.close() 3038 3039 def test_connection(self): 3040 conn, child_conn = self.Pipe() 3041 3042 p = self.Process(target=self._echo, args=(child_conn,)) 3043 p.daemon = True 3044 p.start() 3045 3046 seq = [1, 2.25, None] 3047 msg = latin('hello world') 3048 longmsg = msg * 10 3049 arr = array.array('i', list(range(4))) 3050 3051 if self.TYPE == 'processes': 3052 self.assertEqual(type(conn.fileno()), int) 3053 3054 self.assertEqual(conn.send(seq), None) 3055 self.assertEqual(conn.recv(), seq) 3056 3057 self.assertEqual(conn.send_bytes(msg), None) 3058 self.assertEqual(conn.recv_bytes(), msg) 3059 3060 if self.TYPE == 'processes': 3061 buffer = array.array('i', [0]*10) 3062 expected = list(arr) + [0] * (10 - len(arr)) 3063 self.assertEqual(conn.send_bytes(arr), None) 3064 self.assertEqual(conn.recv_bytes_into(buffer), 3065 len(arr) * buffer.itemsize) 3066 self.assertEqual(list(buffer), expected) 3067 3068 buffer = array.array('i', [0]*10) 3069 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3070 self.assertEqual(conn.send_bytes(arr), None) 3071 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3072 len(arr) * buffer.itemsize) 3073 self.assertEqual(list(buffer), expected) 3074 3075 buffer = bytearray(latin(' ' * 40)) 3076 self.assertEqual(conn.send_bytes(longmsg), None) 3077 try: 3078 res = conn.recv_bytes_into(buffer) 3079 except multiprocessing.BufferTooShort as e: 3080 self.assertEqual(e.args, (longmsg,)) 3081 else: 3082 self.fail('expected BufferTooShort, got %s' % res) 3083 3084 poll = TimingWrapper(conn.poll) 3085 3086 self.assertEqual(poll(), False) 3087 self.assertTimingAlmostEqual(poll.elapsed, 0) 3088 3089 self.assertEqual(poll(-1), False) 3090 self.assertTimingAlmostEqual(poll.elapsed, 0) 3091 3092 self.assertEqual(poll(TIMEOUT1), False) 3093 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3094 3095 conn.send(None) 3096 time.sleep(.1) 3097 3098 self.assertEqual(poll(TIMEOUT1), True) 3099 self.assertTimingAlmostEqual(poll.elapsed, 0) 3100 3101 self.assertEqual(conn.recv(), None) 3102 3103 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3104 conn.send_bytes(really_big_msg) 3105 self.assertEqual(conn.recv_bytes(), really_big_msg) 3106 3107 conn.send_bytes(SENTINEL) # tell child to quit 3108 child_conn.close() 3109 3110 if self.TYPE == 'processes': 3111 self.assertEqual(conn.readable, True) 3112 self.assertEqual(conn.writable, True) 3113 self.assertRaises(EOFError, conn.recv) 3114 self.assertRaises(EOFError, conn.recv_bytes) 3115 3116 p.join() 3117 3118 def test_duplex_false(self): 3119 reader, writer = self.Pipe(duplex=False) 3120 self.assertEqual(writer.send(1), None) 3121 self.assertEqual(reader.recv(), 1) 3122 if self.TYPE == 'processes': 3123 self.assertEqual(reader.readable, True) 3124 self.assertEqual(reader.writable, False) 3125 self.assertEqual(writer.readable, False) 3126 self.assertEqual(writer.writable, True) 3127 self.assertRaises(OSError, reader.send, 2) 3128 self.assertRaises(OSError, writer.recv) 3129 self.assertRaises(OSError, writer.poll) 3130 3131 def test_spawn_close(self): 3132 # We test that a pipe connection can be closed by parent 3133 # process immediately after child is spawned. On Windows this 3134 # would have sometimes failed on old versions because 3135 # child_conn would be closed before the child got a chance to 3136 # duplicate it. 3137 conn, child_conn = self.Pipe() 3138 3139 p = self.Process(target=self._echo, args=(child_conn,)) 3140 p.daemon = True 3141 p.start() 3142 child_conn.close() # this might complete before child initializes 3143 3144 msg = latin('hello') 3145 conn.send_bytes(msg) 3146 self.assertEqual(conn.recv_bytes(), msg) 3147 3148 conn.send_bytes(SENTINEL) 3149 conn.close() 3150 p.join() 3151 3152 def test_sendbytes(self): 3153 if self.TYPE != 'processes': 3154 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3155 3156 msg = latin('abcdefghijklmnopqrstuvwxyz') 3157 a, b = self.Pipe() 3158 3159 a.send_bytes(msg) 3160 self.assertEqual(b.recv_bytes(), msg) 3161 3162 a.send_bytes(msg, 5) 3163 self.assertEqual(b.recv_bytes(), msg[5:]) 3164 3165 a.send_bytes(msg, 7, 8) 3166 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3167 3168 a.send_bytes(msg, 26) 3169 self.assertEqual(b.recv_bytes(), latin('')) 3170 3171 a.send_bytes(msg, 26, 0) 3172 self.assertEqual(b.recv_bytes(), latin('')) 3173 3174 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3175 3176 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3177 3178 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3179 3180 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3181 3182 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3183 3184 @classmethod 3185 def _is_fd_assigned(cls, fd): 3186 try: 3187 os.fstat(fd) 3188 except OSError as e: 3189 if e.errno == errno.EBADF: 3190 return False 3191 raise 3192 else: 3193 return True 3194 3195 @classmethod 3196 def _writefd(cls, conn, data, create_dummy_fds=False): 3197 if create_dummy_fds: 3198 for i in range(0, 256): 3199 if not cls._is_fd_assigned(i): 3200 os.dup2(conn.fileno(), i) 3201 fd = reduction.recv_handle(conn) 3202 if msvcrt: 3203 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3204 os.write(fd, data) 3205 os.close(fd) 3206 3207 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3208 def test_fd_transfer(self): 3209 if self.TYPE != 'processes': 3210 self.skipTest("only makes sense with processes") 3211 conn, child_conn = self.Pipe(duplex=True) 3212 3213 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3214 p.daemon = True 3215 p.start() 3216 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3217 with open(os_helper.TESTFN, "wb") as f: 3218 fd = f.fileno() 3219 if msvcrt: 3220 fd = msvcrt.get_osfhandle(fd) 3221 reduction.send_handle(conn, fd, p.pid) 3222 p.join() 3223 with open(os_helper.TESTFN, "rb") as f: 3224 self.assertEqual(f.read(), b"foo") 3225 3226 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3227 @unittest.skipIf(sys.platform == "win32", 3228 "test semantics don't make sense on Windows") 3229 @unittest.skipIf(MAXFD <= 256, 3230 "largest assignable fd number is too small") 3231 @unittest.skipUnless(hasattr(os, "dup2"), 3232 "test needs os.dup2()") 3233 def test_large_fd_transfer(self): 3234 # With fd > 256 (issue #11657) 3235 if self.TYPE != 'processes': 3236 self.skipTest("only makes sense with processes") 3237 conn, child_conn = self.Pipe(duplex=True) 3238 3239 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3240 p.daemon = True 3241 p.start() 3242 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3243 with open(os_helper.TESTFN, "wb") as f: 3244 fd = f.fileno() 3245 for newfd in range(256, MAXFD): 3246 if not self._is_fd_assigned(newfd): 3247 break 3248 else: 3249 self.fail("could not find an unassigned large file descriptor") 3250 os.dup2(fd, newfd) 3251 try: 3252 reduction.send_handle(conn, newfd, p.pid) 3253 finally: 3254 os.close(newfd) 3255 p.join() 3256 with open(os_helper.TESTFN, "rb") as f: 3257 self.assertEqual(f.read(), b"bar") 3258 3259 @classmethod 3260 def _send_data_without_fd(self, conn): 3261 os.write(conn.fileno(), b"\0") 3262 3263 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3264 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3265 def test_missing_fd_transfer(self): 3266 # Check that exception is raised when received data is not 3267 # accompanied by a file descriptor in ancillary data. 3268 if self.TYPE != 'processes': 3269 self.skipTest("only makes sense with processes") 3270 conn, child_conn = self.Pipe(duplex=True) 3271 3272 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3273 p.daemon = True 3274 p.start() 3275 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3276 p.join() 3277 3278 def test_context(self): 3279 a, b = self.Pipe() 3280 3281 with a, b: 3282 a.send(1729) 3283 self.assertEqual(b.recv(), 1729) 3284 if self.TYPE == 'processes': 3285 self.assertFalse(a.closed) 3286 self.assertFalse(b.closed) 3287 3288 if self.TYPE == 'processes': 3289 self.assertTrue(a.closed) 3290 self.assertTrue(b.closed) 3291 self.assertRaises(OSError, a.recv) 3292 self.assertRaises(OSError, b.recv) 3293 3294class _TestListener(BaseTestCase): 3295 3296 ALLOWED_TYPES = ('processes',) 3297 3298 def test_multiple_bind(self): 3299 for family in self.connection.families: 3300 l = self.connection.Listener(family=family) 3301 self.addCleanup(l.close) 3302 self.assertRaises(OSError, self.connection.Listener, 3303 l.address, family) 3304 3305 def test_context(self): 3306 with self.connection.Listener() as l: 3307 with self.connection.Client(l.address) as c: 3308 with l.accept() as d: 3309 c.send(1729) 3310 self.assertEqual(d.recv(), 1729) 3311 3312 if self.TYPE == 'processes': 3313 self.assertRaises(OSError, l.accept) 3314 3315 @unittest.skipUnless(util.abstract_sockets_supported, 3316 "test needs abstract socket support") 3317 def test_abstract_socket(self): 3318 with self.connection.Listener("\0something") as listener: 3319 with self.connection.Client(listener.address) as client: 3320 with listener.accept() as d: 3321 client.send(1729) 3322 self.assertEqual(d.recv(), 1729) 3323 3324 if self.TYPE == 'processes': 3325 self.assertRaises(OSError, listener.accept) 3326 3327 3328class _TestListenerClient(BaseTestCase): 3329 3330 ALLOWED_TYPES = ('processes', 'threads') 3331 3332 @classmethod 3333 def _test(cls, address): 3334 conn = cls.connection.Client(address) 3335 conn.send('hello') 3336 conn.close() 3337 3338 def test_listener_client(self): 3339 for family in self.connection.families: 3340 l = self.connection.Listener(family=family) 3341 p = self.Process(target=self._test, args=(l.address,)) 3342 p.daemon = True 3343 p.start() 3344 conn = l.accept() 3345 self.assertEqual(conn.recv(), 'hello') 3346 p.join() 3347 l.close() 3348 3349 def test_issue14725(self): 3350 l = self.connection.Listener() 3351 p = self.Process(target=self._test, args=(l.address,)) 3352 p.daemon = True 3353 p.start() 3354 time.sleep(1) 3355 # On Windows the client process should by now have connected, 3356 # written data and closed the pipe handle by now. This causes 3357 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3358 # 14725. 3359 conn = l.accept() 3360 self.assertEqual(conn.recv(), 'hello') 3361 conn.close() 3362 p.join() 3363 l.close() 3364 3365 def test_issue16955(self): 3366 for fam in self.connection.families: 3367 l = self.connection.Listener(family=fam) 3368 c = self.connection.Client(l.address) 3369 a = l.accept() 3370 a.send_bytes(b"hello") 3371 self.assertTrue(c.poll(1)) 3372 a.close() 3373 c.close() 3374 l.close() 3375 3376class _TestPoll(BaseTestCase): 3377 3378 ALLOWED_TYPES = ('processes', 'threads') 3379 3380 def test_empty_string(self): 3381 a, b = self.Pipe() 3382 self.assertEqual(a.poll(), False) 3383 b.send_bytes(b'') 3384 self.assertEqual(a.poll(), True) 3385 self.assertEqual(a.poll(), True) 3386 3387 @classmethod 3388 def _child_strings(cls, conn, strings): 3389 for s in strings: 3390 time.sleep(0.1) 3391 conn.send_bytes(s) 3392 conn.close() 3393 3394 def test_strings(self): 3395 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3396 a, b = self.Pipe() 3397 p = self.Process(target=self._child_strings, args=(b, strings)) 3398 p.start() 3399 3400 for s in strings: 3401 for i in range(200): 3402 if a.poll(0.01): 3403 break 3404 x = a.recv_bytes() 3405 self.assertEqual(s, x) 3406 3407 p.join() 3408 3409 @classmethod 3410 def _child_boundaries(cls, r): 3411 # Polling may "pull" a message in to the child process, but we 3412 # don't want it to pull only part of a message, as that would 3413 # corrupt the pipe for any other processes which might later 3414 # read from it. 3415 r.poll(5) 3416 3417 def test_boundaries(self): 3418 r, w = self.Pipe(False) 3419 p = self.Process(target=self._child_boundaries, args=(r,)) 3420 p.start() 3421 time.sleep(2) 3422 L = [b"first", b"second"] 3423 for obj in L: 3424 w.send_bytes(obj) 3425 w.close() 3426 p.join() 3427 self.assertIn(r.recv_bytes(), L) 3428 3429 @classmethod 3430 def _child_dont_merge(cls, b): 3431 b.send_bytes(b'a') 3432 b.send_bytes(b'b') 3433 b.send_bytes(b'cd') 3434 3435 def test_dont_merge(self): 3436 a, b = self.Pipe() 3437 self.assertEqual(a.poll(0.0), False) 3438 self.assertEqual(a.poll(0.1), False) 3439 3440 p = self.Process(target=self._child_dont_merge, args=(b,)) 3441 p.start() 3442 3443 self.assertEqual(a.recv_bytes(), b'a') 3444 self.assertEqual(a.poll(1.0), True) 3445 self.assertEqual(a.poll(1.0), True) 3446 self.assertEqual(a.recv_bytes(), b'b') 3447 self.assertEqual(a.poll(1.0), True) 3448 self.assertEqual(a.poll(1.0), True) 3449 self.assertEqual(a.poll(0.0), True) 3450 self.assertEqual(a.recv_bytes(), b'cd') 3451 3452 p.join() 3453 3454# 3455# Test of sending connection and socket objects between processes 3456# 3457 3458@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3459@hashlib_helper.requires_hashdigest('md5') 3460class _TestPicklingConnections(BaseTestCase): 3461 3462 ALLOWED_TYPES = ('processes',) 3463 3464 @classmethod 3465 def tearDownClass(cls): 3466 from multiprocessing import resource_sharer 3467 resource_sharer.stop(timeout=support.LONG_TIMEOUT) 3468 3469 @classmethod 3470 def _listener(cls, conn, families): 3471 for fam in families: 3472 l = cls.connection.Listener(family=fam) 3473 conn.send(l.address) 3474 new_conn = l.accept() 3475 conn.send(new_conn) 3476 new_conn.close() 3477 l.close() 3478 3479 l = socket.create_server((socket_helper.HOST, 0)) 3480 conn.send(l.getsockname()) 3481 new_conn, addr = l.accept() 3482 conn.send(new_conn) 3483 new_conn.close() 3484 l.close() 3485 3486 conn.recv() 3487 3488 @classmethod 3489 def _remote(cls, conn): 3490 for (address, msg) in iter(conn.recv, None): 3491 client = cls.connection.Client(address) 3492 client.send(msg.upper()) 3493 client.close() 3494 3495 address, msg = conn.recv() 3496 client = socket.socket() 3497 client.connect(address) 3498 client.sendall(msg.upper()) 3499 client.close() 3500 3501 conn.close() 3502 3503 def test_pickling(self): 3504 families = self.connection.families 3505 3506 lconn, lconn0 = self.Pipe() 3507 lp = self.Process(target=self._listener, args=(lconn0, families)) 3508 lp.daemon = True 3509 lp.start() 3510 lconn0.close() 3511 3512 rconn, rconn0 = self.Pipe() 3513 rp = self.Process(target=self._remote, args=(rconn0,)) 3514 rp.daemon = True 3515 rp.start() 3516 rconn0.close() 3517 3518 for fam in families: 3519 msg = ('This connection uses family %s' % fam).encode('ascii') 3520 address = lconn.recv() 3521 rconn.send((address, msg)) 3522 new_conn = lconn.recv() 3523 self.assertEqual(new_conn.recv(), msg.upper()) 3524 3525 rconn.send(None) 3526 3527 msg = latin('This connection uses a normal socket') 3528 address = lconn.recv() 3529 rconn.send((address, msg)) 3530 new_conn = lconn.recv() 3531 buf = [] 3532 while True: 3533 s = new_conn.recv(100) 3534 if not s: 3535 break 3536 buf.append(s) 3537 buf = b''.join(buf) 3538 self.assertEqual(buf, msg.upper()) 3539 new_conn.close() 3540 3541 lconn.send(None) 3542 3543 rconn.close() 3544 lconn.close() 3545 3546 lp.join() 3547 rp.join() 3548 3549 @classmethod 3550 def child_access(cls, conn): 3551 w = conn.recv() 3552 w.send('all is well') 3553 w.close() 3554 3555 r = conn.recv() 3556 msg = r.recv() 3557 conn.send(msg*2) 3558 3559 conn.close() 3560 3561 def test_access(self): 3562 # On Windows, if we do not specify a destination pid when 3563 # using DupHandle then we need to be careful to use the 3564 # correct access flags for DuplicateHandle(), or else 3565 # DupHandle.detach() will raise PermissionError. For example, 3566 # for a read only pipe handle we should use 3567 # access=FILE_GENERIC_READ. (Unfortunately 3568 # DUPLICATE_SAME_ACCESS does not work.) 3569 conn, child_conn = self.Pipe() 3570 p = self.Process(target=self.child_access, args=(child_conn,)) 3571 p.daemon = True 3572 p.start() 3573 child_conn.close() 3574 3575 r, w = self.Pipe(duplex=False) 3576 conn.send(w) 3577 w.close() 3578 self.assertEqual(r.recv(), 'all is well') 3579 r.close() 3580 3581 r, w = self.Pipe(duplex=False) 3582 conn.send(r) 3583 r.close() 3584 w.send('foobar') 3585 w.close() 3586 self.assertEqual(conn.recv(), 'foobar'*2) 3587 3588 p.join() 3589 3590# 3591# 3592# 3593 3594class _TestHeap(BaseTestCase): 3595 3596 ALLOWED_TYPES = ('processes',) 3597 3598 def setUp(self): 3599 super().setUp() 3600 # Make pristine heap for these tests 3601 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3602 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3603 3604 def tearDown(self): 3605 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3606 super().tearDown() 3607 3608 def test_heap(self): 3609 iterations = 5000 3610 maxblocks = 50 3611 blocks = [] 3612 3613 # get the heap object 3614 heap = multiprocessing.heap.BufferWrapper._heap 3615 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3616 3617 # create and destroy lots of blocks of different sizes 3618 for i in range(iterations): 3619 size = int(random.lognormvariate(0, 1) * 1000) 3620 b = multiprocessing.heap.BufferWrapper(size) 3621 blocks.append(b) 3622 if len(blocks) > maxblocks: 3623 i = random.randrange(maxblocks) 3624 del blocks[i] 3625 del b 3626 3627 # verify the state of the heap 3628 with heap._lock: 3629 all = [] 3630 free = 0 3631 occupied = 0 3632 for L in list(heap._len_to_seq.values()): 3633 # count all free blocks in arenas 3634 for arena, start, stop in L: 3635 all.append((heap._arenas.index(arena), start, stop, 3636 stop-start, 'free')) 3637 free += (stop-start) 3638 for arena, arena_blocks in heap._allocated_blocks.items(): 3639 # count all allocated blocks in arenas 3640 for start, stop in arena_blocks: 3641 all.append((heap._arenas.index(arena), start, stop, 3642 stop-start, 'occupied')) 3643 occupied += (stop-start) 3644 3645 self.assertEqual(free + occupied, 3646 sum(arena.size for arena in heap._arenas)) 3647 3648 all.sort() 3649 3650 for i in range(len(all)-1): 3651 (arena, start, stop) = all[i][:3] 3652 (narena, nstart, nstop) = all[i+1][:3] 3653 if arena != narena: 3654 # Two different arenas 3655 self.assertEqual(stop, heap._arenas[arena].size) # last block 3656 self.assertEqual(nstart, 0) # first block 3657 else: 3658 # Same arena: two adjacent blocks 3659 self.assertEqual(stop, nstart) 3660 3661 # test free'ing all blocks 3662 random.shuffle(blocks) 3663 while blocks: 3664 blocks.pop() 3665 3666 self.assertEqual(heap._n_frees, heap._n_mallocs) 3667 self.assertEqual(len(heap._pending_free_blocks), 0) 3668 self.assertEqual(len(heap._arenas), 0) 3669 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 3670 self.assertEqual(len(heap._len_to_seq), 0) 3671 3672 def test_free_from_gc(self): 3673 # Check that freeing of blocks by the garbage collector doesn't deadlock 3674 # (issue #12352). 3675 # Make sure the GC is enabled, and set lower collection thresholds to 3676 # make collections more frequent (and increase the probability of 3677 # deadlock). 3678 if not gc.isenabled(): 3679 gc.enable() 3680 self.addCleanup(gc.disable) 3681 thresholds = gc.get_threshold() 3682 self.addCleanup(gc.set_threshold, *thresholds) 3683 gc.set_threshold(10) 3684 3685 # perform numerous block allocations, with cyclic references to make 3686 # sure objects are collected asynchronously by the gc 3687 for i in range(5000): 3688 a = multiprocessing.heap.BufferWrapper(1) 3689 b = multiprocessing.heap.BufferWrapper(1) 3690 # circular references 3691 a.buddy = b 3692 b.buddy = a 3693 3694# 3695# 3696# 3697 3698class _Foo(Structure): 3699 _fields_ = [ 3700 ('x', c_int), 3701 ('y', c_double), 3702 ('z', c_longlong,) 3703 ] 3704 3705class _TestSharedCTypes(BaseTestCase): 3706 3707 ALLOWED_TYPES = ('processes',) 3708 3709 def setUp(self): 3710 if not HAS_SHAREDCTYPES: 3711 self.skipTest("requires multiprocessing.sharedctypes") 3712 3713 @classmethod 3714 def _double(cls, x, y, z, foo, arr, string): 3715 x.value *= 2 3716 y.value *= 2 3717 z.value *= 2 3718 foo.x *= 2 3719 foo.y *= 2 3720 string.value *= 2 3721 for i in range(len(arr)): 3722 arr[i] *= 2 3723 3724 def test_sharedctypes(self, lock=False): 3725 x = Value('i', 7, lock=lock) 3726 y = Value(c_double, 1.0/3.0, lock=lock) 3727 z = Value(c_longlong, 2 ** 33, lock=lock) 3728 foo = Value(_Foo, 3, 2, lock=lock) 3729 arr = self.Array('d', list(range(10)), lock=lock) 3730 string = self.Array('c', 20, lock=lock) 3731 string.value = latin('hello') 3732 3733 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3734 p.daemon = True 3735 p.start() 3736 p.join() 3737 3738 self.assertEqual(x.value, 14) 3739 self.assertAlmostEqual(y.value, 2.0/3.0) 3740 self.assertEqual(z.value, 2 ** 34) 3741 self.assertEqual(foo.x, 6) 3742 self.assertAlmostEqual(foo.y, 4.0) 3743 for i in range(10): 3744 self.assertAlmostEqual(arr[i], i*2) 3745 self.assertEqual(string.value, latin('hellohello')) 3746 3747 def test_synchronize(self): 3748 self.test_sharedctypes(lock=True) 3749 3750 def test_copy(self): 3751 foo = _Foo(2, 5.0, 2 ** 33) 3752 bar = copy(foo) 3753 foo.x = 0 3754 foo.y = 0 3755 foo.z = 0 3756 self.assertEqual(bar.x, 2) 3757 self.assertAlmostEqual(bar.y, 5.0) 3758 self.assertEqual(bar.z, 2 ** 33) 3759 3760 3761@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 3762@hashlib_helper.requires_hashdigest('md5') 3763class _TestSharedMemory(BaseTestCase): 3764 3765 ALLOWED_TYPES = ('processes',) 3766 3767 @staticmethod 3768 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 3769 if isinstance(shmem_name_or_obj, str): 3770 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 3771 else: 3772 local_sms = shmem_name_or_obj 3773 local_sms.buf[:len(binary_data)] = binary_data 3774 local_sms.close() 3775 3776 def _new_shm_name(self, prefix): 3777 # Add a PID to the name of a POSIX shared memory object to allow 3778 # running multiprocessing tests (test_multiprocessing_fork, 3779 # test_multiprocessing_spawn, etc) in parallel. 3780 return prefix + str(os.getpid()) 3781 3782 def test_shared_memory_basics(self): 3783 name_tsmb = self._new_shm_name('test01_tsmb') 3784 sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512) 3785 self.addCleanup(sms.unlink) 3786 3787 # Verify attributes are readable. 3788 self.assertEqual(sms.name, name_tsmb) 3789 self.assertGreaterEqual(sms.size, 512) 3790 self.assertGreaterEqual(len(sms.buf), sms.size) 3791 3792 # Verify __repr__ 3793 self.assertIn(sms.name, str(sms)) 3794 self.assertIn(str(sms.size), str(sms)) 3795 3796 # Modify contents of shared memory segment through memoryview. 3797 sms.buf[0] = 42 3798 self.assertEqual(sms.buf[0], 42) 3799 3800 # Attach to existing shared memory segment. 3801 also_sms = shared_memory.SharedMemory(name_tsmb) 3802 self.assertEqual(also_sms.buf[0], 42) 3803 also_sms.close() 3804 3805 # Attach to existing shared memory segment but specify a new size. 3806 same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size) 3807 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 3808 same_sms.close() 3809 3810 # Creating Shared Memory Segment with -ve size 3811 with self.assertRaises(ValueError): 3812 shared_memory.SharedMemory(create=True, size=-2) 3813 3814 # Attaching Shared Memory Segment without a name 3815 with self.assertRaises(ValueError): 3816 shared_memory.SharedMemory(create=False) 3817 3818 # Test if shared memory segment is created properly, 3819 # when _make_filename returns an existing shared memory segment name 3820 with unittest.mock.patch( 3821 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 3822 3823 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 3824 names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')] 3825 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 3826 # because some POSIX compliant systems require name to start with / 3827 names = [NAME_PREFIX + name for name in names] 3828 3829 mock_make_filename.side_effect = names 3830 shm1 = shared_memory.SharedMemory(create=True, size=1) 3831 self.addCleanup(shm1.unlink) 3832 self.assertEqual(shm1._name, names[0]) 3833 3834 mock_make_filename.side_effect = names 3835 shm2 = shared_memory.SharedMemory(create=True, size=1) 3836 self.addCleanup(shm2.unlink) 3837 self.assertEqual(shm2._name, names[1]) 3838 3839 if shared_memory._USE_POSIX: 3840 # Posix Shared Memory can only be unlinked once. Here we 3841 # test an implementation detail that is not observed across 3842 # all supported platforms (since WindowsNamedSharedMemory 3843 # manages unlinking on its own and unlink() does nothing). 3844 # True release of shared memory segment does not necessarily 3845 # happen until process exits, depending on the OS platform. 3846 name_dblunlink = self._new_shm_name('test01_dblunlink') 3847 sms_uno = shared_memory.SharedMemory( 3848 name_dblunlink, 3849 create=True, 3850 size=5000 3851 ) 3852 with self.assertRaises(FileNotFoundError): 3853 try: 3854 self.assertGreaterEqual(sms_uno.size, 5000) 3855 3856 sms_duo = shared_memory.SharedMemory(name_dblunlink) 3857 sms_duo.unlink() # First shm_unlink() call. 3858 sms_duo.close() 3859 sms_uno.close() 3860 3861 finally: 3862 sms_uno.unlink() # A second shm_unlink() call is bad. 3863 3864 with self.assertRaises(FileExistsError): 3865 # Attempting to create a new shared memory segment with a 3866 # name that is already in use triggers an exception. 3867 there_can_only_be_one_sms = shared_memory.SharedMemory( 3868 name_tsmb, 3869 create=True, 3870 size=512 3871 ) 3872 3873 if shared_memory._USE_POSIX: 3874 # Requesting creation of a shared memory segment with the option 3875 # to attach to an existing segment, if that name is currently in 3876 # use, should not trigger an exception. 3877 # Note: Using a smaller size could possibly cause truncation of 3878 # the existing segment but is OS platform dependent. In the 3879 # case of MacOS/darwin, requesting a smaller size is disallowed. 3880 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 3881 _flags = os.O_CREAT | os.O_RDWR 3882 ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb) 3883 self.assertEqual(ok_if_exists_sms.size, sms.size) 3884 ok_if_exists_sms.close() 3885 3886 # Attempting to attach to an existing shared memory segment when 3887 # no segment exists with the supplied name triggers an exception. 3888 with self.assertRaises(FileNotFoundError): 3889 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 3890 nonexisting_sms.unlink() # Error should occur on prior line. 3891 3892 sms.close() 3893 3894 def test_shared_memory_recreate(self): 3895 # Test if shared memory segment is created properly, 3896 # when _make_filename returns an existing shared memory segment name 3897 with unittest.mock.patch( 3898 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 3899 3900 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 3901 names = ['test01_fn', 'test02_fn'] 3902 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 3903 # because some POSIX compliant systems require name to start with / 3904 names = [NAME_PREFIX + name for name in names] 3905 3906 mock_make_filename.side_effect = names 3907 shm1 = shared_memory.SharedMemory(create=True, size=1) 3908 self.addCleanup(shm1.unlink) 3909 self.assertEqual(shm1._name, names[0]) 3910 3911 mock_make_filename.side_effect = names 3912 shm2 = shared_memory.SharedMemory(create=True, size=1) 3913 self.addCleanup(shm2.unlink) 3914 self.assertEqual(shm2._name, names[1]) 3915 3916 def test_invalid_shared_memory_cration(self): 3917 # Test creating a shared memory segment with negative size 3918 with self.assertRaises(ValueError): 3919 sms_invalid = shared_memory.SharedMemory(create=True, size=-1) 3920 3921 # Test creating a shared memory segment with size 0 3922 with self.assertRaises(ValueError): 3923 sms_invalid = shared_memory.SharedMemory(create=True, size=0) 3924 3925 # Test creating a shared memory segment without size argument 3926 with self.assertRaises(ValueError): 3927 sms_invalid = shared_memory.SharedMemory(create=True) 3928 3929 def test_shared_memory_pickle_unpickle(self): 3930 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 3931 with self.subTest(proto=proto): 3932 sms = shared_memory.SharedMemory(create=True, size=512) 3933 self.addCleanup(sms.unlink) 3934 sms.buf[0:6] = b'pickle' 3935 3936 # Test pickling 3937 pickled_sms = pickle.dumps(sms, protocol=proto) 3938 3939 # Test unpickling 3940 sms2 = pickle.loads(pickled_sms) 3941 self.assertIsInstance(sms2, shared_memory.SharedMemory) 3942 self.assertEqual(sms.name, sms2.name) 3943 self.assertEqual(bytes(sms.buf[0:6]), b'pickle') 3944 self.assertEqual(bytes(sms2.buf[0:6]), b'pickle') 3945 3946 # Test that unpickled version is still the same SharedMemory 3947 sms.buf[0:6] = b'newval' 3948 self.assertEqual(bytes(sms.buf[0:6]), b'newval') 3949 self.assertEqual(bytes(sms2.buf[0:6]), b'newval') 3950 3951 sms2.buf[0:6] = b'oldval' 3952 self.assertEqual(bytes(sms.buf[0:6]), b'oldval') 3953 self.assertEqual(bytes(sms2.buf[0:6]), b'oldval') 3954 3955 def test_shared_memory_pickle_unpickle_dead_object(self): 3956 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 3957 with self.subTest(proto=proto): 3958 sms = shared_memory.SharedMemory(create=True, size=512) 3959 sms.buf[0:6] = b'pickle' 3960 pickled_sms = pickle.dumps(sms, protocol=proto) 3961 3962 # Now, we are going to kill the original object. 3963 # So, unpickled one won't be able to attach to it. 3964 sms.close() 3965 sms.unlink() 3966 3967 with self.assertRaises(FileNotFoundError): 3968 pickle.loads(pickled_sms) 3969 3970 def test_shared_memory_across_processes(self): 3971 # bpo-40135: don't define shared memory block's name in case of 3972 # the failure when we run multiprocessing tests in parallel. 3973 sms = shared_memory.SharedMemory(create=True, size=512) 3974 self.addCleanup(sms.unlink) 3975 3976 # Verify remote attachment to existing block by name is working. 3977 p = self.Process( 3978 target=self._attach_existing_shmem_then_write, 3979 args=(sms.name, b'howdy') 3980 ) 3981 p.daemon = True 3982 p.start() 3983 p.join() 3984 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 3985 3986 # Verify pickling of SharedMemory instance also works. 3987 p = self.Process( 3988 target=self._attach_existing_shmem_then_write, 3989 args=(sms, b'HELLO') 3990 ) 3991 p.daemon = True 3992 p.start() 3993 p.join() 3994 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 3995 3996 sms.close() 3997 3998 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 3999 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 4000 # bpo-36368: protect SharedMemoryManager server process from 4001 # KeyboardInterrupt signals. 4002 smm = multiprocessing.managers.SharedMemoryManager() 4003 smm.start() 4004 4005 # make sure the manager works properly at the beginning 4006 sl = smm.ShareableList(range(10)) 4007 4008 # the manager's server should ignore KeyboardInterrupt signals, and 4009 # maintain its connection with the current process, and success when 4010 # asked to deliver memory segments. 4011 os.kill(smm._process.pid, signal.SIGINT) 4012 4013 sl2 = smm.ShareableList(range(10)) 4014 4015 # test that the custom signal handler registered in the Manager does 4016 # not affect signal handling in the parent process. 4017 with self.assertRaises(KeyboardInterrupt): 4018 os.kill(os.getpid(), signal.SIGINT) 4019 4020 smm.shutdown() 4021 4022 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 4023 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 4024 # bpo-36867: test that a SharedMemoryManager uses the 4025 # same resource_tracker process as its parent. 4026 cmd = '''if 1: 4027 from multiprocessing.managers import SharedMemoryManager 4028 4029 4030 smm = SharedMemoryManager() 4031 smm.start() 4032 sl = smm.ShareableList(range(10)) 4033 smm.shutdown() 4034 ''' 4035 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 4036 4037 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 4038 # resource_tracker process as its parent would make the parent's 4039 # tracker complain about sl being leaked even though smm.shutdown() 4040 # properly released sl. 4041 self.assertFalse(err) 4042 4043 def test_shared_memory_SharedMemoryManager_basics(self): 4044 smm1 = multiprocessing.managers.SharedMemoryManager() 4045 with self.assertRaises(ValueError): 4046 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 4047 smm1.start() 4048 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 4049 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 4050 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 4051 self.assertEqual(len(doppleganger_list0), 5) 4052 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 4053 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 4054 held_name = lom[0].name 4055 smm1.shutdown() 4056 if sys.platform != "win32": 4057 # Calls to unlink() have no effect on Windows platform; shared 4058 # memory will only be released once final process exits. 4059 with self.assertRaises(FileNotFoundError): 4060 # No longer there to be attached to again. 4061 absent_shm = shared_memory.SharedMemory(name=held_name) 4062 4063 with multiprocessing.managers.SharedMemoryManager() as smm2: 4064 sl = smm2.ShareableList("howdy") 4065 shm = smm2.SharedMemory(size=128) 4066 held_name = sl.shm.name 4067 if sys.platform != "win32": 4068 with self.assertRaises(FileNotFoundError): 4069 # No longer there to be attached to again. 4070 absent_sl = shared_memory.ShareableList(name=held_name) 4071 4072 4073 def test_shared_memory_ShareableList_basics(self): 4074 sl = shared_memory.ShareableList( 4075 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 4076 ) 4077 self.addCleanup(sl.shm.unlink) 4078 4079 # Verify __repr__ 4080 self.assertIn(sl.shm.name, str(sl)) 4081 self.assertIn(str(list(sl)), str(sl)) 4082 4083 # Index Out of Range (get) 4084 with self.assertRaises(IndexError): 4085 sl[7] 4086 4087 # Index Out of Range (set) 4088 with self.assertRaises(IndexError): 4089 sl[7] = 2 4090 4091 # Assign value without format change (str -> str) 4092 current_format = sl._get_packing_format(0) 4093 sl[0] = 'howdy' 4094 self.assertEqual(current_format, sl._get_packing_format(0)) 4095 4096 # Verify attributes are readable. 4097 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 4098 4099 # Exercise len(). 4100 self.assertEqual(len(sl), 7) 4101 4102 # Exercise index(). 4103 with warnings.catch_warnings(): 4104 # Suppress BytesWarning when comparing against b'HoWdY'. 4105 warnings.simplefilter('ignore') 4106 with self.assertRaises(ValueError): 4107 sl.index('100') 4108 self.assertEqual(sl.index(100), 3) 4109 4110 # Exercise retrieving individual values. 4111 self.assertEqual(sl[0], 'howdy') 4112 self.assertEqual(sl[-2], True) 4113 4114 # Exercise iterability. 4115 self.assertEqual( 4116 tuple(sl), 4117 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 4118 ) 4119 4120 # Exercise modifying individual values. 4121 sl[3] = 42 4122 self.assertEqual(sl[3], 42) 4123 sl[4] = 'some' # Change type at a given position. 4124 self.assertEqual(sl[4], 'some') 4125 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 4126 with self.assertRaisesRegex(ValueError, 4127 "exceeds available storage"): 4128 sl[4] = 'far too many' 4129 self.assertEqual(sl[4], 'some') 4130 sl[0] = 'encodés' # Exactly 8 bytes of UTF-8 data 4131 self.assertEqual(sl[0], 'encodés') 4132 self.assertEqual(sl[1], b'HoWdY') # no spillage 4133 with self.assertRaisesRegex(ValueError, 4134 "exceeds available storage"): 4135 sl[0] = 'encodées' # Exactly 9 bytes of UTF-8 data 4136 self.assertEqual(sl[1], b'HoWdY') 4137 with self.assertRaisesRegex(ValueError, 4138 "exceeds available storage"): 4139 sl[1] = b'123456789' 4140 self.assertEqual(sl[1], b'HoWdY') 4141 4142 # Exercise count(). 4143 with warnings.catch_warnings(): 4144 # Suppress BytesWarning when comparing against b'HoWdY'. 4145 warnings.simplefilter('ignore') 4146 self.assertEqual(sl.count(42), 2) 4147 self.assertEqual(sl.count(b'HoWdY'), 1) 4148 self.assertEqual(sl.count(b'adios'), 0) 4149 4150 # Exercise creating a duplicate. 4151 name_duplicate = self._new_shm_name('test03_duplicate') 4152 sl_copy = shared_memory.ShareableList(sl, name=name_duplicate) 4153 try: 4154 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 4155 self.assertEqual(name_duplicate, sl_copy.shm.name) 4156 self.assertEqual(list(sl), list(sl_copy)) 4157 self.assertEqual(sl.format, sl_copy.format) 4158 sl_copy[-1] = 77 4159 self.assertEqual(sl_copy[-1], 77) 4160 self.assertNotEqual(sl[-1], 77) 4161 sl_copy.shm.close() 4162 finally: 4163 sl_copy.shm.unlink() 4164 4165 # Obtain a second handle on the same ShareableList. 4166 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 4167 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 4168 sl_tethered[-1] = 880 4169 self.assertEqual(sl[-1], 880) 4170 sl_tethered.shm.close() 4171 4172 sl.shm.close() 4173 4174 # Exercise creating an empty ShareableList. 4175 empty_sl = shared_memory.ShareableList() 4176 try: 4177 self.assertEqual(len(empty_sl), 0) 4178 self.assertEqual(empty_sl.format, '') 4179 self.assertEqual(empty_sl.count('any'), 0) 4180 with self.assertRaises(ValueError): 4181 empty_sl.index(None) 4182 empty_sl.shm.close() 4183 finally: 4184 empty_sl.shm.unlink() 4185 4186 def test_shared_memory_ShareableList_pickling(self): 4187 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4188 with self.subTest(proto=proto): 4189 sl = shared_memory.ShareableList(range(10)) 4190 self.addCleanup(sl.shm.unlink) 4191 4192 serialized_sl = pickle.dumps(sl, protocol=proto) 4193 deserialized_sl = pickle.loads(serialized_sl) 4194 self.assertIsInstance( 4195 deserialized_sl, shared_memory.ShareableList) 4196 self.assertEqual(deserialized_sl[-1], 9) 4197 self.assertIsNot(sl, deserialized_sl) 4198 4199 deserialized_sl[4] = "changed" 4200 self.assertEqual(sl[4], "changed") 4201 sl[3] = "newvalue" 4202 self.assertEqual(deserialized_sl[3], "newvalue") 4203 4204 larger_sl = shared_memory.ShareableList(range(400)) 4205 self.addCleanup(larger_sl.shm.unlink) 4206 serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto) 4207 self.assertEqual(len(serialized_sl), len(serialized_larger_sl)) 4208 larger_sl.shm.close() 4209 4210 deserialized_sl.shm.close() 4211 sl.shm.close() 4212 4213 def test_shared_memory_ShareableList_pickling_dead_object(self): 4214 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4215 with self.subTest(proto=proto): 4216 sl = shared_memory.ShareableList(range(10)) 4217 serialized_sl = pickle.dumps(sl, protocol=proto) 4218 4219 # Now, we are going to kill the original object. 4220 # So, unpickled one won't be able to attach to it. 4221 sl.shm.close() 4222 sl.shm.unlink() 4223 4224 with self.assertRaises(FileNotFoundError): 4225 pickle.loads(serialized_sl) 4226 4227 def test_shared_memory_cleaned_after_process_termination(self): 4228 cmd = '''if 1: 4229 import os, time, sys 4230 from multiprocessing import shared_memory 4231 4232 # Create a shared_memory segment, and send the segment name 4233 sm = shared_memory.SharedMemory(create=True, size=10) 4234 sys.stdout.write(sm.name + '\\n') 4235 sys.stdout.flush() 4236 time.sleep(100) 4237 ''' 4238 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4239 stdout=subprocess.PIPE, 4240 stderr=subprocess.PIPE) as p: 4241 name = p.stdout.readline().strip().decode() 4242 4243 # killing abruptly processes holding reference to a shared memory 4244 # segment should not leak the given memory segment. 4245 p.terminate() 4246 p.wait() 4247 4248 deadline = time.monotonic() + support.LONG_TIMEOUT 4249 t = 0.1 4250 while time.monotonic() < deadline: 4251 time.sleep(t) 4252 t = min(t*2, 5) 4253 try: 4254 smm = shared_memory.SharedMemory(name, create=False) 4255 except FileNotFoundError: 4256 break 4257 else: 4258 raise AssertionError("A SharedMemory segment was leaked after" 4259 " a process was abruptly terminated.") 4260 4261 if os.name == 'posix': 4262 # Without this line it was raising warnings like: 4263 # UserWarning: resource_tracker: 4264 # There appear to be 1 leaked shared_memory 4265 # objects to clean up at shutdown 4266 # See: https://bugs.python.org/issue45209 4267 resource_tracker.unregister(f"/{name}", "shared_memory") 4268 4269 # A warning was emitted by the subprocess' own 4270 # resource_tracker (on Windows, shared memory segments 4271 # are released automatically by the OS). 4272 err = p.stderr.read().decode() 4273 self.assertIn( 4274 "resource_tracker: There appear to be 1 leaked " 4275 "shared_memory objects to clean up at shutdown", err) 4276 4277# 4278# Test to verify that `Finalize` works. 4279# 4280 4281class _TestFinalize(BaseTestCase): 4282 4283 ALLOWED_TYPES = ('processes',) 4284 4285 def setUp(self): 4286 self.registry_backup = util._finalizer_registry.copy() 4287 util._finalizer_registry.clear() 4288 4289 def tearDown(self): 4290 gc.collect() # For PyPy or other GCs. 4291 self.assertFalse(util._finalizer_registry) 4292 util._finalizer_registry.update(self.registry_backup) 4293 4294 @classmethod 4295 def _test_finalize(cls, conn): 4296 class Foo(object): 4297 pass 4298 4299 a = Foo() 4300 util.Finalize(a, conn.send, args=('a',)) 4301 del a # triggers callback for a 4302 gc.collect() # For PyPy or other GCs. 4303 4304 b = Foo() 4305 close_b = util.Finalize(b, conn.send, args=('b',)) 4306 close_b() # triggers callback for b 4307 close_b() # does nothing because callback has already been called 4308 del b # does nothing because callback has already been called 4309 gc.collect() # For PyPy or other GCs. 4310 4311 c = Foo() 4312 util.Finalize(c, conn.send, args=('c',)) 4313 4314 d10 = Foo() 4315 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4316 4317 d01 = Foo() 4318 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4319 d02 = Foo() 4320 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4321 d03 = Foo() 4322 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4323 4324 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4325 4326 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4327 4328 # call multiprocessing's cleanup function then exit process without 4329 # garbage collecting locals 4330 util._exit_function() 4331 conn.close() 4332 os._exit(0) 4333 4334 def test_finalize(self): 4335 conn, child_conn = self.Pipe() 4336 4337 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4338 p.daemon = True 4339 p.start() 4340 p.join() 4341 4342 result = [obj for obj in iter(conn.recv, 'STOP')] 4343 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4344 4345 def test_thread_safety(self): 4346 # bpo-24484: _run_finalizers() should be thread-safe 4347 def cb(): 4348 pass 4349 4350 class Foo(object): 4351 def __init__(self): 4352 self.ref = self # create reference cycle 4353 # insert finalizer at random key 4354 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4355 4356 finish = False 4357 exc = None 4358 4359 def run_finalizers(): 4360 nonlocal exc 4361 while not finish: 4362 time.sleep(random.random() * 1e-1) 4363 try: 4364 # A GC run will eventually happen during this, 4365 # collecting stale Foo's and mutating the registry 4366 util._run_finalizers() 4367 except Exception as e: 4368 exc = e 4369 4370 def make_finalizers(): 4371 nonlocal exc 4372 d = {} 4373 while not finish: 4374 try: 4375 # Old Foo's get gradually replaced and later 4376 # collected by the GC (because of the cyclic ref) 4377 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4378 except Exception as e: 4379 exc = e 4380 d.clear() 4381 4382 old_interval = sys.getswitchinterval() 4383 old_threshold = gc.get_threshold() 4384 try: 4385 sys.setswitchinterval(1e-6) 4386 gc.set_threshold(5, 5, 5) 4387 threads = [threading.Thread(target=run_finalizers), 4388 threading.Thread(target=make_finalizers)] 4389 with threading_helper.start_threads(threads): 4390 time.sleep(4.0) # Wait a bit to trigger race condition 4391 finish = True 4392 if exc is not None: 4393 raise exc 4394 finally: 4395 sys.setswitchinterval(old_interval) 4396 gc.set_threshold(*old_threshold) 4397 gc.collect() # Collect remaining Foo's 4398 4399 4400# 4401# Test that from ... import * works for each module 4402# 4403 4404class _TestImportStar(unittest.TestCase): 4405 4406 def get_module_names(self): 4407 import glob 4408 folder = os.path.dirname(multiprocessing.__file__) 4409 pattern = os.path.join(glob.escape(folder), '*.py') 4410 files = glob.glob(pattern) 4411 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4412 modules = ['multiprocessing.' + m for m in modules] 4413 modules.remove('multiprocessing.__init__') 4414 modules.append('multiprocessing') 4415 return modules 4416 4417 def test_import(self): 4418 modules = self.get_module_names() 4419 if sys.platform == 'win32': 4420 modules.remove('multiprocessing.popen_fork') 4421 modules.remove('multiprocessing.popen_forkserver') 4422 modules.remove('multiprocessing.popen_spawn_posix') 4423 else: 4424 modules.remove('multiprocessing.popen_spawn_win32') 4425 if not HAS_REDUCTION: 4426 modules.remove('multiprocessing.popen_forkserver') 4427 4428 if c_int is None: 4429 # This module requires _ctypes 4430 modules.remove('multiprocessing.sharedctypes') 4431 4432 for name in modules: 4433 __import__(name) 4434 mod = sys.modules[name] 4435 self.assertTrue(hasattr(mod, '__all__'), name) 4436 4437 for attr in mod.__all__: 4438 self.assertTrue( 4439 hasattr(mod, attr), 4440 '%r does not have attribute %r' % (mod, attr) 4441 ) 4442 4443# 4444# Quick test that logging works -- does not test logging output 4445# 4446 4447class _TestLogging(BaseTestCase): 4448 4449 ALLOWED_TYPES = ('processes',) 4450 4451 def test_enable_logging(self): 4452 logger = multiprocessing.get_logger() 4453 logger.setLevel(util.SUBWARNING) 4454 self.assertTrue(logger is not None) 4455 logger.debug('this will not be printed') 4456 logger.info('nor will this') 4457 logger.setLevel(LOG_LEVEL) 4458 4459 @classmethod 4460 def _test_level(cls, conn): 4461 logger = multiprocessing.get_logger() 4462 conn.send(logger.getEffectiveLevel()) 4463 4464 def test_level(self): 4465 LEVEL1 = 32 4466 LEVEL2 = 37 4467 4468 logger = multiprocessing.get_logger() 4469 root_logger = logging.getLogger() 4470 root_level = root_logger.level 4471 4472 reader, writer = multiprocessing.Pipe(duplex=False) 4473 4474 logger.setLevel(LEVEL1) 4475 p = self.Process(target=self._test_level, args=(writer,)) 4476 p.start() 4477 self.assertEqual(LEVEL1, reader.recv()) 4478 p.join() 4479 p.close() 4480 4481 logger.setLevel(logging.NOTSET) 4482 root_logger.setLevel(LEVEL2) 4483 p = self.Process(target=self._test_level, args=(writer,)) 4484 p.start() 4485 self.assertEqual(LEVEL2, reader.recv()) 4486 p.join() 4487 p.close() 4488 4489 root_logger.setLevel(root_level) 4490 logger.setLevel(level=LOG_LEVEL) 4491 4492 4493# class _TestLoggingProcessName(BaseTestCase): 4494# 4495# def handle(self, record): 4496# assert record.processName == multiprocessing.current_process().name 4497# self.__handled = True 4498# 4499# def test_logging(self): 4500# handler = logging.Handler() 4501# handler.handle = self.handle 4502# self.__handled = False 4503# # Bypass getLogger() and side-effects 4504# logger = logging.getLoggerClass()( 4505# 'multiprocessing.test.TestLoggingProcessName') 4506# logger.addHandler(handler) 4507# logger.propagate = False 4508# 4509# logger.warn('foo') 4510# assert self.__handled 4511 4512# 4513# Check that Process.join() retries if os.waitpid() fails with EINTR 4514# 4515 4516class _TestPollEintr(BaseTestCase): 4517 4518 ALLOWED_TYPES = ('processes',) 4519 4520 @classmethod 4521 def _killer(cls, pid): 4522 time.sleep(0.1) 4523 os.kill(pid, signal.SIGUSR1) 4524 4525 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4526 def test_poll_eintr(self): 4527 got_signal = [False] 4528 def record(*args): 4529 got_signal[0] = True 4530 pid = os.getpid() 4531 oldhandler = signal.signal(signal.SIGUSR1, record) 4532 try: 4533 killer = self.Process(target=self._killer, args=(pid,)) 4534 killer.start() 4535 try: 4536 p = self.Process(target=time.sleep, args=(2,)) 4537 p.start() 4538 p.join() 4539 finally: 4540 killer.join() 4541 self.assertTrue(got_signal[0]) 4542 self.assertEqual(p.exitcode, 0) 4543 finally: 4544 signal.signal(signal.SIGUSR1, oldhandler) 4545 4546# 4547# Test to verify handle verification, see issue 3321 4548# 4549 4550class TestInvalidHandle(unittest.TestCase): 4551 4552 @unittest.skipIf(WIN32, "skipped on Windows") 4553 def test_invalid_handles(self): 4554 conn = multiprocessing.connection.Connection(44977608) 4555 # check that poll() doesn't crash 4556 try: 4557 conn.poll() 4558 except (ValueError, OSError): 4559 pass 4560 finally: 4561 # Hack private attribute _handle to avoid printing an error 4562 # in conn.__del__ 4563 conn._handle = None 4564 self.assertRaises((ValueError, OSError), 4565 multiprocessing.connection.Connection, -1) 4566 4567 4568 4569@hashlib_helper.requires_hashdigest('md5') 4570class OtherTest(unittest.TestCase): 4571 # TODO: add more tests for deliver/answer challenge. 4572 def test_deliver_challenge_auth_failure(self): 4573 class _FakeConnection(object): 4574 def recv_bytes(self, size): 4575 return b'something bogus' 4576 def send_bytes(self, data): 4577 pass 4578 self.assertRaises(multiprocessing.AuthenticationError, 4579 multiprocessing.connection.deliver_challenge, 4580 _FakeConnection(), b'abc') 4581 4582 def test_answer_challenge_auth_failure(self): 4583 class _FakeConnection(object): 4584 def __init__(self): 4585 self.count = 0 4586 def recv_bytes(self, size): 4587 self.count += 1 4588 if self.count == 1: 4589 return multiprocessing.connection.CHALLENGE 4590 elif self.count == 2: 4591 return b'something bogus' 4592 return b'' 4593 def send_bytes(self, data): 4594 pass 4595 self.assertRaises(multiprocessing.AuthenticationError, 4596 multiprocessing.connection.answer_challenge, 4597 _FakeConnection(), b'abc') 4598 4599# 4600# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 4601# 4602 4603def initializer(ns): 4604 ns.test += 1 4605 4606@hashlib_helper.requires_hashdigest('md5') 4607class TestInitializers(unittest.TestCase): 4608 def setUp(self): 4609 self.mgr = multiprocessing.Manager() 4610 self.ns = self.mgr.Namespace() 4611 self.ns.test = 0 4612 4613 def tearDown(self): 4614 self.mgr.shutdown() 4615 self.mgr.join() 4616 4617 def test_manager_initializer(self): 4618 m = multiprocessing.managers.SyncManager() 4619 self.assertRaises(TypeError, m.start, 1) 4620 m.start(initializer, (self.ns,)) 4621 self.assertEqual(self.ns.test, 1) 4622 m.shutdown() 4623 m.join() 4624 4625 def test_pool_initializer(self): 4626 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 4627 p = multiprocessing.Pool(1, initializer, (self.ns,)) 4628 p.close() 4629 p.join() 4630 self.assertEqual(self.ns.test, 1) 4631 4632# 4633# Issue 5155, 5313, 5331: Test process in processes 4634# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 4635# 4636 4637def _this_sub_process(q): 4638 try: 4639 item = q.get(block=False) 4640 except pyqueue.Empty: 4641 pass 4642 4643def _test_process(): 4644 queue = multiprocessing.Queue() 4645 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 4646 subProc.daemon = True 4647 subProc.start() 4648 subProc.join() 4649 4650def _afunc(x): 4651 return x*x 4652 4653def pool_in_process(): 4654 pool = multiprocessing.Pool(processes=4) 4655 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 4656 pool.close() 4657 pool.join() 4658 4659class _file_like(object): 4660 def __init__(self, delegate): 4661 self._delegate = delegate 4662 self._pid = None 4663 4664 @property 4665 def cache(self): 4666 pid = os.getpid() 4667 # There are no race conditions since fork keeps only the running thread 4668 if pid != self._pid: 4669 self._pid = pid 4670 self._cache = [] 4671 return self._cache 4672 4673 def write(self, data): 4674 self.cache.append(data) 4675 4676 def flush(self): 4677 self._delegate.write(''.join(self.cache)) 4678 self._cache = [] 4679 4680class TestStdinBadfiledescriptor(unittest.TestCase): 4681 4682 def test_queue_in_process(self): 4683 proc = multiprocessing.Process(target=_test_process) 4684 proc.start() 4685 proc.join() 4686 4687 def test_pool_in_process(self): 4688 p = multiprocessing.Process(target=pool_in_process) 4689 p.start() 4690 p.join() 4691 4692 def test_flushing(self): 4693 sio = io.StringIO() 4694 flike = _file_like(sio) 4695 flike.write('foo') 4696 proc = multiprocessing.Process(target=lambda: flike.flush()) 4697 flike.flush() 4698 assert sio.getvalue() == 'foo' 4699 4700 4701class TestWait(unittest.TestCase): 4702 4703 @classmethod 4704 def _child_test_wait(cls, w, slow): 4705 for i in range(10): 4706 if slow: 4707 time.sleep(random.random()*0.1) 4708 w.send((i, os.getpid())) 4709 w.close() 4710 4711 def test_wait(self, slow=False): 4712 from multiprocessing.connection import wait 4713 readers = [] 4714 procs = [] 4715 messages = [] 4716 4717 for i in range(4): 4718 r, w = multiprocessing.Pipe(duplex=False) 4719 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 4720 p.daemon = True 4721 p.start() 4722 w.close() 4723 readers.append(r) 4724 procs.append(p) 4725 self.addCleanup(p.join) 4726 4727 while readers: 4728 for r in wait(readers): 4729 try: 4730 msg = r.recv() 4731 except EOFError: 4732 readers.remove(r) 4733 r.close() 4734 else: 4735 messages.append(msg) 4736 4737 messages.sort() 4738 expected = sorted((i, p.pid) for i in range(10) for p in procs) 4739 self.assertEqual(messages, expected) 4740 4741 @classmethod 4742 def _child_test_wait_socket(cls, address, slow): 4743 s = socket.socket() 4744 s.connect(address) 4745 for i in range(10): 4746 if slow: 4747 time.sleep(random.random()*0.1) 4748 s.sendall(('%s\n' % i).encode('ascii')) 4749 s.close() 4750 4751 def test_wait_socket(self, slow=False): 4752 from multiprocessing.connection import wait 4753 l = socket.create_server((socket_helper.HOST, 0)) 4754 addr = l.getsockname() 4755 readers = [] 4756 procs = [] 4757 dic = {} 4758 4759 for i in range(4): 4760 p = multiprocessing.Process(target=self._child_test_wait_socket, 4761 args=(addr, slow)) 4762 p.daemon = True 4763 p.start() 4764 procs.append(p) 4765 self.addCleanup(p.join) 4766 4767 for i in range(4): 4768 r, _ = l.accept() 4769 readers.append(r) 4770 dic[r] = [] 4771 l.close() 4772 4773 while readers: 4774 for r in wait(readers): 4775 msg = r.recv(32) 4776 if not msg: 4777 readers.remove(r) 4778 r.close() 4779 else: 4780 dic[r].append(msg) 4781 4782 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4783 for v in dic.values(): 4784 self.assertEqual(b''.join(v), expected) 4785 4786 def test_wait_slow(self): 4787 self.test_wait(True) 4788 4789 def test_wait_socket_slow(self): 4790 self.test_wait_socket(True) 4791 4792 def test_wait_timeout(self): 4793 from multiprocessing.connection import wait 4794 4795 expected = 5 4796 a, b = multiprocessing.Pipe() 4797 4798 start = time.monotonic() 4799 res = wait([a, b], expected) 4800 delta = time.monotonic() - start 4801 4802 self.assertEqual(res, []) 4803 self.assertLess(delta, expected * 2) 4804 self.assertGreater(delta, expected * 0.5) 4805 4806 b.send(None) 4807 4808 start = time.monotonic() 4809 res = wait([a, b], 20) 4810 delta = time.monotonic() - start 4811 4812 self.assertEqual(res, [a]) 4813 self.assertLess(delta, 0.4) 4814 4815 @classmethod 4816 def signal_and_sleep(cls, sem, period): 4817 sem.release() 4818 time.sleep(period) 4819 4820 def test_wait_integer(self): 4821 from multiprocessing.connection import wait 4822 4823 expected = 3 4824 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4825 sem = multiprocessing.Semaphore(0) 4826 a, b = multiprocessing.Pipe() 4827 p = multiprocessing.Process(target=self.signal_and_sleep, 4828 args=(sem, expected)) 4829 4830 p.start() 4831 self.assertIsInstance(p.sentinel, int) 4832 self.assertTrue(sem.acquire(timeout=20)) 4833 4834 start = time.monotonic() 4835 res = wait([a, p.sentinel, b], expected + 20) 4836 delta = time.monotonic() - start 4837 4838 self.assertEqual(res, [p.sentinel]) 4839 self.assertLess(delta, expected + 2) 4840 self.assertGreater(delta, expected - 2) 4841 4842 a.send(None) 4843 4844 start = time.monotonic() 4845 res = wait([a, p.sentinel, b], 20) 4846 delta = time.monotonic() - start 4847 4848 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4849 self.assertLess(delta, 0.4) 4850 4851 b.send(None) 4852 4853 start = time.monotonic() 4854 res = wait([a, p.sentinel, b], 20) 4855 delta = time.monotonic() - start 4856 4857 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4858 self.assertLess(delta, 0.4) 4859 4860 p.terminate() 4861 p.join() 4862 4863 def test_neg_timeout(self): 4864 from multiprocessing.connection import wait 4865 a, b = multiprocessing.Pipe() 4866 t = time.monotonic() 4867 res = wait([a], timeout=-1) 4868 t = time.monotonic() - t 4869 self.assertEqual(res, []) 4870 self.assertLess(t, 1) 4871 a.close() 4872 b.close() 4873 4874# 4875# Issue 14151: Test invalid family on invalid environment 4876# 4877 4878class TestInvalidFamily(unittest.TestCase): 4879 4880 @unittest.skipIf(WIN32, "skipped on Windows") 4881 def test_invalid_family(self): 4882 with self.assertRaises(ValueError): 4883 multiprocessing.connection.Listener(r'\\.\test') 4884 4885 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4886 def test_invalid_family_win32(self): 4887 with self.assertRaises(ValueError): 4888 multiprocessing.connection.Listener('/var/test.pipe') 4889 4890# 4891# Issue 12098: check sys.flags of child matches that for parent 4892# 4893 4894class TestFlags(unittest.TestCase): 4895 @classmethod 4896 def run_in_grandchild(cls, conn): 4897 conn.send(tuple(sys.flags)) 4898 4899 @classmethod 4900 def run_in_child(cls): 4901 import json 4902 r, w = multiprocessing.Pipe(duplex=False) 4903 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4904 p.start() 4905 grandchild_flags = r.recv() 4906 p.join() 4907 r.close() 4908 w.close() 4909 flags = (tuple(sys.flags), grandchild_flags) 4910 print(json.dumps(flags)) 4911 4912 def test_flags(self): 4913 import json 4914 # start child process using unusual flags 4915 prog = ('from test._test_multiprocessing import TestFlags; ' + 4916 'TestFlags.run_in_child()') 4917 data = subprocess.check_output( 4918 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4919 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4920 self.assertEqual(child_flags, grandchild_flags) 4921 4922# 4923# Test interaction with socket timeouts - see Issue #6056 4924# 4925 4926class TestTimeouts(unittest.TestCase): 4927 @classmethod 4928 def _test_timeout(cls, child, address): 4929 time.sleep(1) 4930 child.send(123) 4931 child.close() 4932 conn = multiprocessing.connection.Client(address) 4933 conn.send(456) 4934 conn.close() 4935 4936 def test_timeout(self): 4937 old_timeout = socket.getdefaulttimeout() 4938 try: 4939 socket.setdefaulttimeout(0.1) 4940 parent, child = multiprocessing.Pipe(duplex=True) 4941 l = multiprocessing.connection.Listener(family='AF_INET') 4942 p = multiprocessing.Process(target=self._test_timeout, 4943 args=(child, l.address)) 4944 p.start() 4945 child.close() 4946 self.assertEqual(parent.recv(), 123) 4947 parent.close() 4948 conn = l.accept() 4949 self.assertEqual(conn.recv(), 456) 4950 conn.close() 4951 l.close() 4952 join_process(p) 4953 finally: 4954 socket.setdefaulttimeout(old_timeout) 4955 4956# 4957# Test what happens with no "if __name__ == '__main__'" 4958# 4959 4960class TestNoForkBomb(unittest.TestCase): 4961 def test_noforkbomb(self): 4962 sm = multiprocessing.get_start_method() 4963 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 4964 if sm != 'fork': 4965 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 4966 self.assertEqual(out, b'') 4967 self.assertIn(b'RuntimeError', err) 4968 else: 4969 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 4970 self.assertEqual(out.rstrip(), b'123') 4971 self.assertEqual(err, b'') 4972 4973# 4974# Issue #17555: ForkAwareThreadLock 4975# 4976 4977class TestForkAwareThreadLock(unittest.TestCase): 4978 # We recursively start processes. Issue #17555 meant that the 4979 # after fork registry would get duplicate entries for the same 4980 # lock. The size of the registry at generation n was ~2**n. 4981 4982 @classmethod 4983 def child(cls, n, conn): 4984 if n > 1: 4985 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 4986 p.start() 4987 conn.close() 4988 join_process(p) 4989 else: 4990 conn.send(len(util._afterfork_registry)) 4991 conn.close() 4992 4993 def test_lock(self): 4994 r, w = multiprocessing.Pipe(False) 4995 l = util.ForkAwareThreadLock() 4996 old_size = len(util._afterfork_registry) 4997 p = multiprocessing.Process(target=self.child, args=(5, w)) 4998 p.start() 4999 w.close() 5000 new_size = r.recv() 5001 join_process(p) 5002 self.assertLessEqual(new_size, old_size) 5003 5004# 5005# Check that non-forked child processes do not inherit unneeded fds/handles 5006# 5007 5008class TestCloseFds(unittest.TestCase): 5009 5010 def get_high_socket_fd(self): 5011 if WIN32: 5012 # The child process will not have any socket handles, so 5013 # calling socket.fromfd() should produce WSAENOTSOCK even 5014 # if there is a handle of the same number. 5015 return socket.socket().detach() 5016 else: 5017 # We want to produce a socket with an fd high enough that a 5018 # freshly created child process will not have any fds as high. 5019 fd = socket.socket().detach() 5020 to_close = [] 5021 while fd < 50: 5022 to_close.append(fd) 5023 fd = os.dup(fd) 5024 for x in to_close: 5025 os.close(x) 5026 return fd 5027 5028 def close(self, fd): 5029 if WIN32: 5030 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 5031 else: 5032 os.close(fd) 5033 5034 @classmethod 5035 def _test_closefds(cls, conn, fd): 5036 try: 5037 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 5038 except Exception as e: 5039 conn.send(e) 5040 else: 5041 s.close() 5042 conn.send(None) 5043 5044 def test_closefd(self): 5045 if not HAS_REDUCTION: 5046 raise unittest.SkipTest('requires fd pickling') 5047 5048 reader, writer = multiprocessing.Pipe() 5049 fd = self.get_high_socket_fd() 5050 try: 5051 p = multiprocessing.Process(target=self._test_closefds, 5052 args=(writer, fd)) 5053 p.start() 5054 writer.close() 5055 e = reader.recv() 5056 join_process(p) 5057 finally: 5058 self.close(fd) 5059 writer.close() 5060 reader.close() 5061 5062 if multiprocessing.get_start_method() == 'fork': 5063 self.assertIs(e, None) 5064 else: 5065 WSAENOTSOCK = 10038 5066 self.assertIsInstance(e, OSError) 5067 self.assertTrue(e.errno == errno.EBADF or 5068 e.winerror == WSAENOTSOCK, e) 5069 5070# 5071# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 5072# 5073 5074class TestIgnoreEINTR(unittest.TestCase): 5075 5076 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 5077 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 5078 5079 @classmethod 5080 def _test_ignore(cls, conn): 5081 def handler(signum, frame): 5082 pass 5083 signal.signal(signal.SIGUSR1, handler) 5084 conn.send('ready') 5085 x = conn.recv() 5086 conn.send(x) 5087 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 5088 5089 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5090 def test_ignore(self): 5091 conn, child_conn = multiprocessing.Pipe() 5092 try: 5093 p = multiprocessing.Process(target=self._test_ignore, 5094 args=(child_conn,)) 5095 p.daemon = True 5096 p.start() 5097 child_conn.close() 5098 self.assertEqual(conn.recv(), 'ready') 5099 time.sleep(0.1) 5100 os.kill(p.pid, signal.SIGUSR1) 5101 time.sleep(0.1) 5102 conn.send(1234) 5103 self.assertEqual(conn.recv(), 1234) 5104 time.sleep(0.1) 5105 os.kill(p.pid, signal.SIGUSR1) 5106 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 5107 time.sleep(0.1) 5108 p.join() 5109 finally: 5110 conn.close() 5111 5112 @classmethod 5113 def _test_ignore_listener(cls, conn): 5114 def handler(signum, frame): 5115 pass 5116 signal.signal(signal.SIGUSR1, handler) 5117 with multiprocessing.connection.Listener() as l: 5118 conn.send(l.address) 5119 a = l.accept() 5120 a.send('welcome') 5121 5122 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5123 def test_ignore_listener(self): 5124 conn, child_conn = multiprocessing.Pipe() 5125 try: 5126 p = multiprocessing.Process(target=self._test_ignore_listener, 5127 args=(child_conn,)) 5128 p.daemon = True 5129 p.start() 5130 child_conn.close() 5131 address = conn.recv() 5132 time.sleep(0.1) 5133 os.kill(p.pid, signal.SIGUSR1) 5134 time.sleep(0.1) 5135 client = multiprocessing.connection.Client(address) 5136 self.assertEqual(client.recv(), 'welcome') 5137 p.join() 5138 finally: 5139 conn.close() 5140 5141class TestStartMethod(unittest.TestCase): 5142 @classmethod 5143 def _check_context(cls, conn): 5144 conn.send(multiprocessing.get_start_method()) 5145 5146 def check_context(self, ctx): 5147 r, w = ctx.Pipe(duplex=False) 5148 p = ctx.Process(target=self._check_context, args=(w,)) 5149 p.start() 5150 w.close() 5151 child_method = r.recv() 5152 r.close() 5153 p.join() 5154 self.assertEqual(child_method, ctx.get_start_method()) 5155 5156 def test_context(self): 5157 for method in ('fork', 'spawn', 'forkserver'): 5158 try: 5159 ctx = multiprocessing.get_context(method) 5160 except ValueError: 5161 continue 5162 self.assertEqual(ctx.get_start_method(), method) 5163 self.assertIs(ctx.get_context(), ctx) 5164 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 5165 self.assertRaises(ValueError, ctx.set_start_method, None) 5166 self.check_context(ctx) 5167 5168 def test_set_get(self): 5169 multiprocessing.set_forkserver_preload(PRELOAD) 5170 count = 0 5171 old_method = multiprocessing.get_start_method() 5172 try: 5173 for method in ('fork', 'spawn', 'forkserver'): 5174 try: 5175 multiprocessing.set_start_method(method, force=True) 5176 except ValueError: 5177 continue 5178 self.assertEqual(multiprocessing.get_start_method(), method) 5179 ctx = multiprocessing.get_context() 5180 self.assertEqual(ctx.get_start_method(), method) 5181 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 5182 self.assertTrue( 5183 ctx.Process.__name__.lower().startswith(method)) 5184 self.check_context(multiprocessing) 5185 count += 1 5186 finally: 5187 multiprocessing.set_start_method(old_method, force=True) 5188 self.assertGreaterEqual(count, 1) 5189 5190 def test_get_all(self): 5191 methods = multiprocessing.get_all_start_methods() 5192 if sys.platform == 'win32': 5193 self.assertEqual(methods, ['spawn']) 5194 else: 5195 self.assertTrue(methods == ['fork', 'spawn'] or 5196 methods == ['spawn', 'fork'] or 5197 methods == ['fork', 'spawn', 'forkserver'] or 5198 methods == ['spawn', 'fork', 'forkserver']) 5199 5200 def test_preload_resources(self): 5201 if multiprocessing.get_start_method() != 'forkserver': 5202 self.skipTest("test only relevant for 'forkserver' method") 5203 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 5204 rc, out, err = test.support.script_helper.assert_python_ok(name) 5205 out = out.decode() 5206 err = err.decode() 5207 if out.rstrip() != 'ok' or err != '': 5208 print(out) 5209 print(err) 5210 self.fail("failed spawning forkserver or grandchild") 5211 5212 5213@unittest.skipIf(sys.platform == "win32", 5214 "test semantics don't make sense on Windows") 5215class TestResourceTracker(unittest.TestCase): 5216 5217 def test_resource_tracker(self): 5218 # 5219 # Check that killing process does not leak named semaphores 5220 # 5221 cmd = '''if 1: 5222 import time, os, tempfile 5223 import multiprocessing as mp 5224 from multiprocessing import resource_tracker 5225 from multiprocessing.shared_memory import SharedMemory 5226 5227 mp.set_start_method("spawn") 5228 rand = tempfile._RandomNameSequence() 5229 5230 5231 def create_and_register_resource(rtype): 5232 if rtype == "semaphore": 5233 lock = mp.Lock() 5234 return lock, lock._semlock.name 5235 elif rtype == "shared_memory": 5236 sm = SharedMemory(create=True, size=10) 5237 return sm, sm._name 5238 else: 5239 raise ValueError( 5240 "Resource type {{}} not understood".format(rtype)) 5241 5242 5243 resource1, rname1 = create_and_register_resource("{rtype}") 5244 resource2, rname2 = create_and_register_resource("{rtype}") 5245 5246 os.write({w}, rname1.encode("ascii") + b"\\n") 5247 os.write({w}, rname2.encode("ascii") + b"\\n") 5248 5249 time.sleep(10) 5250 ''' 5251 for rtype in resource_tracker._CLEANUP_FUNCS: 5252 with self.subTest(rtype=rtype): 5253 if rtype == "noop": 5254 # Artefact resource type used by the resource_tracker 5255 continue 5256 r, w = os.pipe() 5257 p = subprocess.Popen([sys.executable, 5258 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5259 pass_fds=[w], 5260 stderr=subprocess.PIPE) 5261 os.close(w) 5262 with open(r, 'rb', closefd=True) as f: 5263 name1 = f.readline().rstrip().decode('ascii') 5264 name2 = f.readline().rstrip().decode('ascii') 5265 _resource_unlink(name1, rtype) 5266 p.terminate() 5267 p.wait() 5268 5269 deadline = time.monotonic() + support.LONG_TIMEOUT 5270 while time.monotonic() < deadline: 5271 time.sleep(.5) 5272 try: 5273 _resource_unlink(name2, rtype) 5274 except OSError as e: 5275 # docs say it should be ENOENT, but OSX seems to give 5276 # EINVAL 5277 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5278 break 5279 else: 5280 raise AssertionError( 5281 f"A {rtype} resource was leaked after a process was " 5282 f"abruptly terminated.") 5283 err = p.stderr.read().decode('utf-8') 5284 p.stderr.close() 5285 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5286 'objects'.format( 5287 rtype)) 5288 self.assertRegex(err, expected) 5289 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5290 5291 def check_resource_tracker_death(self, signum, should_die): 5292 # bpo-31310: if the semaphore tracker process has died, it should 5293 # be restarted implicitly. 5294 from multiprocessing.resource_tracker import _resource_tracker 5295 pid = _resource_tracker._pid 5296 if pid is not None: 5297 os.kill(pid, signal.SIGKILL) 5298 support.wait_process(pid, exitcode=-signal.SIGKILL) 5299 with warnings.catch_warnings(): 5300 warnings.simplefilter("ignore") 5301 _resource_tracker.ensure_running() 5302 pid = _resource_tracker._pid 5303 5304 os.kill(pid, signum) 5305 time.sleep(1.0) # give it time to die 5306 5307 ctx = multiprocessing.get_context("spawn") 5308 with warnings.catch_warnings(record=True) as all_warn: 5309 warnings.simplefilter("always") 5310 sem = ctx.Semaphore() 5311 sem.acquire() 5312 sem.release() 5313 wr = weakref.ref(sem) 5314 # ensure `sem` gets collected, which triggers communication with 5315 # the semaphore tracker 5316 del sem 5317 gc.collect() 5318 self.assertIsNone(wr()) 5319 if should_die: 5320 self.assertEqual(len(all_warn), 1) 5321 the_warn = all_warn[0] 5322 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5323 self.assertTrue("resource_tracker: process died" 5324 in str(the_warn.message)) 5325 else: 5326 self.assertEqual(len(all_warn), 0) 5327 5328 def test_resource_tracker_sigint(self): 5329 # Catchable signal (ignored by semaphore tracker) 5330 self.check_resource_tracker_death(signal.SIGINT, False) 5331 5332 def test_resource_tracker_sigterm(self): 5333 # Catchable signal (ignored by semaphore tracker) 5334 self.check_resource_tracker_death(signal.SIGTERM, False) 5335 5336 def test_resource_tracker_sigkill(self): 5337 # Uncatchable signal. 5338 self.check_resource_tracker_death(signal.SIGKILL, True) 5339 5340 @staticmethod 5341 def _is_resource_tracker_reused(conn, pid): 5342 from multiprocessing.resource_tracker import _resource_tracker 5343 _resource_tracker.ensure_running() 5344 # The pid should be None in the child process, expect for the fork 5345 # context. It should not be a new value. 5346 reused = _resource_tracker._pid in (None, pid) 5347 reused &= _resource_tracker._check_alive() 5348 conn.send(reused) 5349 5350 def test_resource_tracker_reused(self): 5351 from multiprocessing.resource_tracker import _resource_tracker 5352 _resource_tracker.ensure_running() 5353 pid = _resource_tracker._pid 5354 5355 r, w = multiprocessing.Pipe(duplex=False) 5356 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5357 args=(w, pid)) 5358 p.start() 5359 is_resource_tracker_reused = r.recv() 5360 5361 # Clean up 5362 p.join() 5363 w.close() 5364 r.close() 5365 5366 self.assertTrue(is_resource_tracker_reused) 5367 5368 5369class TestSimpleQueue(unittest.TestCase): 5370 5371 @classmethod 5372 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5373 child_can_start.wait() 5374 # issue 30301, could fail under spawn and forkserver 5375 try: 5376 queue.put(queue.empty()) 5377 queue.put(queue.empty()) 5378 finally: 5379 parent_can_continue.set() 5380 5381 def test_empty(self): 5382 queue = multiprocessing.SimpleQueue() 5383 child_can_start = multiprocessing.Event() 5384 parent_can_continue = multiprocessing.Event() 5385 5386 proc = multiprocessing.Process( 5387 target=self._test_empty, 5388 args=(queue, child_can_start, parent_can_continue) 5389 ) 5390 proc.daemon = True 5391 proc.start() 5392 5393 self.assertTrue(queue.empty()) 5394 5395 child_can_start.set() 5396 parent_can_continue.wait() 5397 5398 self.assertFalse(queue.empty()) 5399 self.assertEqual(queue.get(), True) 5400 self.assertEqual(queue.get(), False) 5401 self.assertTrue(queue.empty()) 5402 5403 proc.join() 5404 5405 def test_close(self): 5406 queue = multiprocessing.SimpleQueue() 5407 queue.close() 5408 # closing a queue twice should not fail 5409 queue.close() 5410 5411 # Test specific to CPython since it tests private attributes 5412 @test.support.cpython_only 5413 def test_closed(self): 5414 queue = multiprocessing.SimpleQueue() 5415 queue.close() 5416 self.assertTrue(queue._reader.closed) 5417 self.assertTrue(queue._writer.closed) 5418 5419 5420class TestPoolNotLeakOnFailure(unittest.TestCase): 5421 5422 def test_release_unused_processes(self): 5423 # Issue #19675: During pool creation, if we can't create a process, 5424 # don't leak already created ones. 5425 will_fail_in = 3 5426 forked_processes = [] 5427 5428 class FailingForkProcess: 5429 def __init__(self, **kwargs): 5430 self.name = 'Fake Process' 5431 self.exitcode = None 5432 self.state = None 5433 forked_processes.append(self) 5434 5435 def start(self): 5436 nonlocal will_fail_in 5437 if will_fail_in <= 0: 5438 raise OSError("Manually induced OSError") 5439 will_fail_in -= 1 5440 self.state = 'started' 5441 5442 def terminate(self): 5443 self.state = 'stopping' 5444 5445 def join(self): 5446 if self.state == 'stopping': 5447 self.state = 'stopped' 5448 5449 def is_alive(self): 5450 return self.state == 'started' or self.state == 'stopping' 5451 5452 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 5453 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 5454 Process=FailingForkProcess)) 5455 p.close() 5456 p.join() 5457 self.assertFalse( 5458 any(process.is_alive() for process in forked_processes)) 5459 5460 5461@hashlib_helper.requires_hashdigest('md5') 5462class TestSyncManagerTypes(unittest.TestCase): 5463 """Test all the types which can be shared between a parent and a 5464 child process by using a manager which acts as an intermediary 5465 between them. 5466 5467 In the following unit-tests the base type is created in the parent 5468 process, the @classmethod represents the worker process and the 5469 shared object is readable and editable between the two. 5470 5471 # The child. 5472 @classmethod 5473 def _test_list(cls, obj): 5474 assert obj[0] == 5 5475 assert obj.append(6) 5476 5477 # The parent. 5478 def test_list(self): 5479 o = self.manager.list() 5480 o.append(5) 5481 self.run_worker(self._test_list, o) 5482 assert o[1] == 6 5483 """ 5484 manager_class = multiprocessing.managers.SyncManager 5485 5486 def setUp(self): 5487 self.manager = self.manager_class() 5488 self.manager.start() 5489 self.proc = None 5490 5491 def tearDown(self): 5492 if self.proc is not None and self.proc.is_alive(): 5493 self.proc.terminate() 5494 self.proc.join() 5495 self.manager.shutdown() 5496 self.manager = None 5497 self.proc = None 5498 5499 @classmethod 5500 def setUpClass(cls): 5501 support.reap_children() 5502 5503 tearDownClass = setUpClass 5504 5505 def wait_proc_exit(self): 5506 # Only the manager process should be returned by active_children() 5507 # but this can take a bit on slow machines, so wait a few seconds 5508 # if there are other children too (see #17395). 5509 join_process(self.proc) 5510 start_time = time.monotonic() 5511 t = 0.01 5512 while len(multiprocessing.active_children()) > 1: 5513 time.sleep(t) 5514 t *= 2 5515 dt = time.monotonic() - start_time 5516 if dt >= 5.0: 5517 test.support.environment_altered = True 5518 support.print_warning(f"multiprocessing.Manager still has " 5519 f"{multiprocessing.active_children()} " 5520 f"active children after {dt} seconds") 5521 break 5522 5523 def run_worker(self, worker, obj): 5524 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 5525 self.proc.daemon = True 5526 self.proc.start() 5527 self.wait_proc_exit() 5528 self.assertEqual(self.proc.exitcode, 0) 5529 5530 @classmethod 5531 def _test_event(cls, obj): 5532 assert obj.is_set() 5533 obj.wait() 5534 obj.clear() 5535 obj.wait(0.001) 5536 5537 def test_event(self): 5538 o = self.manager.Event() 5539 o.set() 5540 self.run_worker(self._test_event, o) 5541 assert not o.is_set() 5542 o.wait(0.001) 5543 5544 @classmethod 5545 def _test_lock(cls, obj): 5546 obj.acquire() 5547 5548 def test_lock(self, lname="Lock"): 5549 o = getattr(self.manager, lname)() 5550 self.run_worker(self._test_lock, o) 5551 o.release() 5552 self.assertRaises(RuntimeError, o.release) # already released 5553 5554 @classmethod 5555 def _test_rlock(cls, obj): 5556 obj.acquire() 5557 obj.release() 5558 5559 def test_rlock(self, lname="Lock"): 5560 o = getattr(self.manager, lname)() 5561 self.run_worker(self._test_rlock, o) 5562 5563 @classmethod 5564 def _test_semaphore(cls, obj): 5565 obj.acquire() 5566 5567 def test_semaphore(self, sname="Semaphore"): 5568 o = getattr(self.manager, sname)() 5569 self.run_worker(self._test_semaphore, o) 5570 o.release() 5571 5572 def test_bounded_semaphore(self): 5573 self.test_semaphore(sname="BoundedSemaphore") 5574 5575 @classmethod 5576 def _test_condition(cls, obj): 5577 obj.acquire() 5578 obj.release() 5579 5580 def test_condition(self): 5581 o = self.manager.Condition() 5582 self.run_worker(self._test_condition, o) 5583 5584 @classmethod 5585 def _test_barrier(cls, obj): 5586 assert obj.parties == 5 5587 obj.reset() 5588 5589 def test_barrier(self): 5590 o = self.manager.Barrier(5) 5591 self.run_worker(self._test_barrier, o) 5592 5593 @classmethod 5594 def _test_pool(cls, obj): 5595 # TODO: fix https://bugs.python.org/issue35919 5596 with obj: 5597 pass 5598 5599 def test_pool(self): 5600 o = self.manager.Pool(processes=4) 5601 self.run_worker(self._test_pool, o) 5602 5603 @classmethod 5604 def _test_queue(cls, obj): 5605 assert obj.qsize() == 2 5606 assert obj.full() 5607 assert not obj.empty() 5608 assert obj.get() == 5 5609 assert not obj.empty() 5610 assert obj.get() == 6 5611 assert obj.empty() 5612 5613 def test_queue(self, qname="Queue"): 5614 o = getattr(self.manager, qname)(2) 5615 o.put(5) 5616 o.put(6) 5617 self.run_worker(self._test_queue, o) 5618 assert o.empty() 5619 assert not o.full() 5620 5621 def test_joinable_queue(self): 5622 self.test_queue("JoinableQueue") 5623 5624 @classmethod 5625 def _test_list(cls, obj): 5626 assert obj[0] == 5 5627 assert obj.count(5) == 1 5628 assert obj.index(5) == 0 5629 obj.sort() 5630 obj.reverse() 5631 for x in obj: 5632 pass 5633 assert len(obj) == 1 5634 assert obj.pop(0) == 5 5635 5636 def test_list(self): 5637 o = self.manager.list() 5638 o.append(5) 5639 self.run_worker(self._test_list, o) 5640 assert not o 5641 self.assertEqual(len(o), 0) 5642 5643 @classmethod 5644 def _test_dict(cls, obj): 5645 assert len(obj) == 1 5646 assert obj['foo'] == 5 5647 assert obj.get('foo') == 5 5648 assert list(obj.items()) == [('foo', 5)] 5649 assert list(obj.keys()) == ['foo'] 5650 assert list(obj.values()) == [5] 5651 assert obj.copy() == {'foo': 5} 5652 assert obj.popitem() == ('foo', 5) 5653 5654 def test_dict(self): 5655 o = self.manager.dict() 5656 o['foo'] = 5 5657 self.run_worker(self._test_dict, o) 5658 assert not o 5659 self.assertEqual(len(o), 0) 5660 5661 @classmethod 5662 def _test_value(cls, obj): 5663 assert obj.value == 1 5664 assert obj.get() == 1 5665 obj.set(2) 5666 5667 def test_value(self): 5668 o = self.manager.Value('i', 1) 5669 self.run_worker(self._test_value, o) 5670 self.assertEqual(o.value, 2) 5671 self.assertEqual(o.get(), 2) 5672 5673 @classmethod 5674 def _test_array(cls, obj): 5675 assert obj[0] == 0 5676 assert obj[1] == 1 5677 assert len(obj) == 2 5678 assert list(obj) == [0, 1] 5679 5680 def test_array(self): 5681 o = self.manager.Array('i', [0, 1]) 5682 self.run_worker(self._test_array, o) 5683 5684 @classmethod 5685 def _test_namespace(cls, obj): 5686 assert obj.x == 0 5687 assert obj.y == 1 5688 5689 def test_namespace(self): 5690 o = self.manager.Namespace() 5691 o.x = 0 5692 o.y = 1 5693 self.run_worker(self._test_namespace, o) 5694 5695 5696class MiscTestCase(unittest.TestCase): 5697 def test__all__(self): 5698 # Just make sure names in not_exported are excluded 5699 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 5700 not_exported=['SUBDEBUG', 'SUBWARNING']) 5701 5702 5703# 5704# Mixins 5705# 5706 5707class BaseMixin(object): 5708 @classmethod 5709 def setUpClass(cls): 5710 cls.dangling = (multiprocessing.process._dangling.copy(), 5711 threading._dangling.copy()) 5712 5713 @classmethod 5714 def tearDownClass(cls): 5715 # bpo-26762: Some multiprocessing objects like Pool create reference 5716 # cycles. Trigger a garbage collection to break these cycles. 5717 test.support.gc_collect() 5718 5719 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 5720 if processes: 5721 test.support.environment_altered = True 5722 support.print_warning(f'Dangling processes: {processes}') 5723 processes = None 5724 5725 threads = set(threading._dangling) - set(cls.dangling[1]) 5726 if threads: 5727 test.support.environment_altered = True 5728 support.print_warning(f'Dangling threads: {threads}') 5729 threads = None 5730 5731 5732class ProcessesMixin(BaseMixin): 5733 TYPE = 'processes' 5734 Process = multiprocessing.Process 5735 connection = multiprocessing.connection 5736 current_process = staticmethod(multiprocessing.current_process) 5737 parent_process = staticmethod(multiprocessing.parent_process) 5738 active_children = staticmethod(multiprocessing.active_children) 5739 Pool = staticmethod(multiprocessing.Pool) 5740 Pipe = staticmethod(multiprocessing.Pipe) 5741 Queue = staticmethod(multiprocessing.Queue) 5742 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 5743 Lock = staticmethod(multiprocessing.Lock) 5744 RLock = staticmethod(multiprocessing.RLock) 5745 Semaphore = staticmethod(multiprocessing.Semaphore) 5746 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 5747 Condition = staticmethod(multiprocessing.Condition) 5748 Event = staticmethod(multiprocessing.Event) 5749 Barrier = staticmethod(multiprocessing.Barrier) 5750 Value = staticmethod(multiprocessing.Value) 5751 Array = staticmethod(multiprocessing.Array) 5752 RawValue = staticmethod(multiprocessing.RawValue) 5753 RawArray = staticmethod(multiprocessing.RawArray) 5754 5755 5756class ManagerMixin(BaseMixin): 5757 TYPE = 'manager' 5758 Process = multiprocessing.Process 5759 Queue = property(operator.attrgetter('manager.Queue')) 5760 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 5761 Lock = property(operator.attrgetter('manager.Lock')) 5762 RLock = property(operator.attrgetter('manager.RLock')) 5763 Semaphore = property(operator.attrgetter('manager.Semaphore')) 5764 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 5765 Condition = property(operator.attrgetter('manager.Condition')) 5766 Event = property(operator.attrgetter('manager.Event')) 5767 Barrier = property(operator.attrgetter('manager.Barrier')) 5768 Value = property(operator.attrgetter('manager.Value')) 5769 Array = property(operator.attrgetter('manager.Array')) 5770 list = property(operator.attrgetter('manager.list')) 5771 dict = property(operator.attrgetter('manager.dict')) 5772 Namespace = property(operator.attrgetter('manager.Namespace')) 5773 5774 @classmethod 5775 def Pool(cls, *args, **kwds): 5776 return cls.manager.Pool(*args, **kwds) 5777 5778 @classmethod 5779 def setUpClass(cls): 5780 super().setUpClass() 5781 cls.manager = multiprocessing.Manager() 5782 5783 @classmethod 5784 def tearDownClass(cls): 5785 # only the manager process should be returned by active_children() 5786 # but this can take a bit on slow machines, so wait a few seconds 5787 # if there are other children too (see #17395) 5788 start_time = time.monotonic() 5789 t = 0.01 5790 while len(multiprocessing.active_children()) > 1: 5791 time.sleep(t) 5792 t *= 2 5793 dt = time.monotonic() - start_time 5794 if dt >= 5.0: 5795 test.support.environment_altered = True 5796 support.print_warning(f"multiprocessing.Manager still has " 5797 f"{multiprocessing.active_children()} " 5798 f"active children after {dt} seconds") 5799 break 5800 5801 gc.collect() # do garbage collection 5802 if cls.manager._number_of_objects() != 0: 5803 # This is not really an error since some tests do not 5804 # ensure that all processes which hold a reference to a 5805 # managed object have been joined. 5806 test.support.environment_altered = True 5807 support.print_warning('Shared objects which still exist ' 5808 'at manager shutdown:') 5809 support.print_warning(cls.manager._debug_info()) 5810 cls.manager.shutdown() 5811 cls.manager.join() 5812 cls.manager = None 5813 5814 super().tearDownClass() 5815 5816 5817class ThreadsMixin(BaseMixin): 5818 TYPE = 'threads' 5819 Process = multiprocessing.dummy.Process 5820 connection = multiprocessing.dummy.connection 5821 current_process = staticmethod(multiprocessing.dummy.current_process) 5822 active_children = staticmethod(multiprocessing.dummy.active_children) 5823 Pool = staticmethod(multiprocessing.dummy.Pool) 5824 Pipe = staticmethod(multiprocessing.dummy.Pipe) 5825 Queue = staticmethod(multiprocessing.dummy.Queue) 5826 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 5827 Lock = staticmethod(multiprocessing.dummy.Lock) 5828 RLock = staticmethod(multiprocessing.dummy.RLock) 5829 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 5830 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 5831 Condition = staticmethod(multiprocessing.dummy.Condition) 5832 Event = staticmethod(multiprocessing.dummy.Event) 5833 Barrier = staticmethod(multiprocessing.dummy.Barrier) 5834 Value = staticmethod(multiprocessing.dummy.Value) 5835 Array = staticmethod(multiprocessing.dummy.Array) 5836 5837# 5838# Functions used to create test cases from the base ones in this module 5839# 5840 5841def install_tests_in_module_dict(remote_globs, start_method): 5842 __module__ = remote_globs['__name__'] 5843 local_globs = globals() 5844 ALL_TYPES = {'processes', 'threads', 'manager'} 5845 5846 for name, base in local_globs.items(): 5847 if not isinstance(base, type): 5848 continue 5849 if issubclass(base, BaseTestCase): 5850 if base is BaseTestCase: 5851 continue 5852 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 5853 for type_ in base.ALLOWED_TYPES: 5854 newname = 'With' + type_.capitalize() + name[1:] 5855 Mixin = local_globs[type_.capitalize() + 'Mixin'] 5856 class Temp(base, Mixin, unittest.TestCase): 5857 pass 5858 if type_ == 'manager': 5859 Temp = hashlib_helper.requires_hashdigest('md5')(Temp) 5860 Temp.__name__ = Temp.__qualname__ = newname 5861 Temp.__module__ = __module__ 5862 remote_globs[newname] = Temp 5863 elif issubclass(base, unittest.TestCase): 5864 class Temp(base, object): 5865 pass 5866 Temp.__name__ = Temp.__qualname__ = name 5867 Temp.__module__ = __module__ 5868 remote_globs[name] = Temp 5869 5870 dangling = [None, None] 5871 old_start_method = [None] 5872 5873 def setUpModule(): 5874 multiprocessing.set_forkserver_preload(PRELOAD) 5875 multiprocessing.process._cleanup() 5876 dangling[0] = multiprocessing.process._dangling.copy() 5877 dangling[1] = threading._dangling.copy() 5878 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 5879 try: 5880 multiprocessing.set_start_method(start_method, force=True) 5881 except ValueError: 5882 raise unittest.SkipTest(start_method + 5883 ' start method not supported') 5884 5885 if sys.platform.startswith("linux"): 5886 try: 5887 lock = multiprocessing.RLock() 5888 except OSError: 5889 raise unittest.SkipTest("OSError raises on RLock creation, " 5890 "see issue 3111!") 5891 check_enough_semaphores() 5892 util.get_temp_dir() # creates temp directory 5893 multiprocessing.get_logger().setLevel(LOG_LEVEL) 5894 5895 def tearDownModule(): 5896 need_sleep = False 5897 5898 # bpo-26762: Some multiprocessing objects like Pool create reference 5899 # cycles. Trigger a garbage collection to break these cycles. 5900 test.support.gc_collect() 5901 5902 multiprocessing.set_start_method(old_start_method[0], force=True) 5903 # pause a bit so we don't get warning about dangling threads/processes 5904 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 5905 if processes: 5906 need_sleep = True 5907 test.support.environment_altered = True 5908 support.print_warning(f'Dangling processes: {processes}') 5909 processes = None 5910 5911 threads = set(threading._dangling) - set(dangling[1]) 5912 if threads: 5913 need_sleep = True 5914 test.support.environment_altered = True 5915 support.print_warning(f'Dangling threads: {threads}') 5916 threads = None 5917 5918 # Sleep 500 ms to give time to child processes to complete. 5919 if need_sleep: 5920 time.sleep(0.5) 5921 5922 multiprocessing.util._cleanup_tests() 5923 5924 remote_globs['setUpModule'] = setUpModule 5925 remote_globs['tearDownModule'] = tearDownModule 5926