1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import time 9import io 10import itertools 11import sys 12import os 13import gc 14import errno 15import signal 16import array 17import socket 18import random 19import logging 20import subprocess 21import struct 22import operator 23import pickle 24import weakref 25import warnings 26import test.support 27import test.support.script_helper 28from test import support 29 30 31# Skip tests if _multiprocessing wasn't built. 32_multiprocessing = test.support.import_module('_multiprocessing') 33# Skip tests if sem_open implementation is broken. 34test.support.import_module('multiprocessing.synchronize') 35import threading 36 37import multiprocessing.connection 38import multiprocessing.dummy 39import multiprocessing.heap 40import multiprocessing.managers 41import multiprocessing.pool 42import multiprocessing.queues 43 44from multiprocessing import util 45 46try: 47 from multiprocessing import reduction 48 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 49except ImportError: 50 HAS_REDUCTION = False 51 52try: 53 from multiprocessing.sharedctypes import Value, copy 54 HAS_SHAREDCTYPES = True 55except ImportError: 56 HAS_SHAREDCTYPES = False 57 58try: 59 from multiprocessing import shared_memory 60 HAS_SHMEM = True 61except ImportError: 62 HAS_SHMEM = False 63 64try: 65 import msvcrt 66except ImportError: 67 msvcrt = None 68 69# 70# 71# 72 73# Timeout to wait until a process completes 74TIMEOUT = 60.0 # seconds 75 76def latin(s): 77 return s.encode('latin') 78 79 80def close_queue(queue): 81 if isinstance(queue, multiprocessing.queues.Queue): 82 queue.close() 83 queue.join_thread() 84 85 86def join_process(process): 87 # Since multiprocessing.Process has the same API than threading.Thread 88 # (join() and is_alive(), the support function can be reused 89 support.join_thread(process, timeout=TIMEOUT) 90 91 92if os.name == "posix": 93 from multiprocessing import resource_tracker 94 95 def _resource_unlink(name, rtype): 96 resource_tracker._CLEANUP_FUNCS[rtype](name) 97 98 99# 100# Constants 101# 102 103LOG_LEVEL = util.SUBWARNING 104#LOG_LEVEL = logging.DEBUG 105 106DELTA = 0.1 107CHECK_TIMINGS = False # making true makes tests take a lot longer 108 # and can sometimes cause some non-serious 109 # failures because some calls block a bit 110 # longer than expected 111if CHECK_TIMINGS: 112 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 113else: 114 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 115 116HAVE_GETVALUE = not getattr(_multiprocessing, 117 'HAVE_BROKEN_SEM_GETVALUE', False) 118 119WIN32 = (sys.platform == "win32") 120 121from multiprocessing.connection import wait 122 123def wait_for_handle(handle, timeout): 124 if timeout is not None and timeout < 0.0: 125 timeout = None 126 return wait([handle], timeout) 127 128try: 129 MAXFD = os.sysconf("SC_OPEN_MAX") 130except: 131 MAXFD = 256 132 133# To speed up tests when using the forkserver, we can preload these: 134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 135 136# 137# Some tests require ctypes 138# 139 140try: 141 from ctypes import Structure, c_int, c_double, c_longlong 142except ImportError: 143 Structure = object 144 c_int = c_double = c_longlong = None 145 146 147def check_enough_semaphores(): 148 """Check that the system supports enough semaphores to run the test.""" 149 # minimum number of semaphores available according to POSIX 150 nsems_min = 256 151 try: 152 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 153 except (AttributeError, ValueError): 154 # sysconf not available or setting not available 155 return 156 if nsems == -1 or nsems >= nsems_min: 157 return 158 raise unittest.SkipTest("The OS doesn't support enough semaphores " 159 "to run the test (required: %d)." % nsems_min) 160 161 162# 163# Creates a wrapper for a function which records the time it takes to finish 164# 165 166class TimingWrapper(object): 167 168 def __init__(self, func): 169 self.func = func 170 self.elapsed = None 171 172 def __call__(self, *args, **kwds): 173 t = time.monotonic() 174 try: 175 return self.func(*args, **kwds) 176 finally: 177 self.elapsed = time.monotonic() - t 178 179# 180# Base class for test cases 181# 182 183class BaseTestCase(object): 184 185 ALLOWED_TYPES = ('processes', 'manager', 'threads') 186 187 def assertTimingAlmostEqual(self, a, b): 188 if CHECK_TIMINGS: 189 self.assertAlmostEqual(a, b, 1) 190 191 def assertReturnsIfImplemented(self, value, func, *args): 192 try: 193 res = func(*args) 194 except NotImplementedError: 195 pass 196 else: 197 return self.assertEqual(value, res) 198 199 # For the sanity of Windows users, rather than crashing or freezing in 200 # multiple ways. 201 def __reduce__(self, *args): 202 raise NotImplementedError("shouldn't try to pickle a test case") 203 204 __reduce_ex__ = __reduce__ 205 206# 207# Return the value of a semaphore 208# 209 210def get_value(self): 211 try: 212 return self.get_value() 213 except AttributeError: 214 try: 215 return self._Semaphore__value 216 except AttributeError: 217 try: 218 return self._value 219 except AttributeError: 220 raise NotImplementedError 221 222# 223# Testcases 224# 225 226class DummyCallable: 227 def __call__(self, q, c): 228 assert isinstance(c, DummyCallable) 229 q.put(5) 230 231 232class _TestProcess(BaseTestCase): 233 234 ALLOWED_TYPES = ('processes', 'threads') 235 236 def test_current(self): 237 if self.TYPE == 'threads': 238 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 239 240 current = self.current_process() 241 authkey = current.authkey 242 243 self.assertTrue(current.is_alive()) 244 self.assertTrue(not current.daemon) 245 self.assertIsInstance(authkey, bytes) 246 self.assertTrue(len(authkey) > 0) 247 self.assertEqual(current.ident, os.getpid()) 248 self.assertEqual(current.exitcode, None) 249 250 def test_daemon_argument(self): 251 if self.TYPE == "threads": 252 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 253 254 # By default uses the current process's daemon flag. 255 proc0 = self.Process(target=self._test) 256 self.assertEqual(proc0.daemon, self.current_process().daemon) 257 proc1 = self.Process(target=self._test, daemon=True) 258 self.assertTrue(proc1.daemon) 259 proc2 = self.Process(target=self._test, daemon=False) 260 self.assertFalse(proc2.daemon) 261 262 @classmethod 263 def _test(cls, q, *args, **kwds): 264 current = cls.current_process() 265 q.put(args) 266 q.put(kwds) 267 q.put(current.name) 268 if cls.TYPE != 'threads': 269 q.put(bytes(current.authkey)) 270 q.put(current.pid) 271 272 def test_parent_process_attributes(self): 273 if self.TYPE == "threads": 274 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 275 276 self.assertIsNone(self.parent_process()) 277 278 rconn, wconn = self.Pipe(duplex=False) 279 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 280 p.start() 281 p.join() 282 parent_pid, parent_name = rconn.recv() 283 self.assertEqual(parent_pid, self.current_process().pid) 284 self.assertEqual(parent_pid, os.getpid()) 285 self.assertEqual(parent_name, self.current_process().name) 286 287 @classmethod 288 def _test_send_parent_process(cls, wconn): 289 from multiprocessing.process import parent_process 290 wconn.send([parent_process().pid, parent_process().name]) 291 292 def test_parent_process(self): 293 if self.TYPE == "threads": 294 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 295 296 # Launch a child process. Make it launch a grandchild process. Kill the 297 # child process and make sure that the grandchild notices the death of 298 # its parent (a.k.a the child process). 299 rconn, wconn = self.Pipe(duplex=False) 300 p = self.Process( 301 target=self._test_create_grandchild_process, args=(wconn, )) 302 p.start() 303 304 if not rconn.poll(timeout=60): 305 raise AssertionError("Could not communicate with child process") 306 parent_process_status = rconn.recv() 307 self.assertEqual(parent_process_status, "alive") 308 309 p.terminate() 310 p.join() 311 312 if not rconn.poll(timeout=60): 313 raise AssertionError("Could not communicate with child process") 314 parent_process_status = rconn.recv() 315 self.assertEqual(parent_process_status, "not alive") 316 317 @classmethod 318 def _test_create_grandchild_process(cls, wconn): 319 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 320 p.start() 321 time.sleep(300) 322 323 @classmethod 324 def _test_report_parent_status(cls, wconn): 325 from multiprocessing.process import parent_process 326 wconn.send("alive" if parent_process().is_alive() else "not alive") 327 parent_process().join(timeout=5) 328 wconn.send("alive" if parent_process().is_alive() else "not alive") 329 330 def test_process(self): 331 q = self.Queue(1) 332 e = self.Event() 333 args = (q, 1, 2) 334 kwargs = {'hello':23, 'bye':2.54} 335 name = 'SomeProcess' 336 p = self.Process( 337 target=self._test, args=args, kwargs=kwargs, name=name 338 ) 339 p.daemon = True 340 current = self.current_process() 341 342 if self.TYPE != 'threads': 343 self.assertEqual(p.authkey, current.authkey) 344 self.assertEqual(p.is_alive(), False) 345 self.assertEqual(p.daemon, True) 346 self.assertNotIn(p, self.active_children()) 347 self.assertTrue(type(self.active_children()) is list) 348 self.assertEqual(p.exitcode, None) 349 350 p.start() 351 352 self.assertEqual(p.exitcode, None) 353 self.assertEqual(p.is_alive(), True) 354 self.assertIn(p, self.active_children()) 355 356 self.assertEqual(q.get(), args[1:]) 357 self.assertEqual(q.get(), kwargs) 358 self.assertEqual(q.get(), p.name) 359 if self.TYPE != 'threads': 360 self.assertEqual(q.get(), current.authkey) 361 self.assertEqual(q.get(), p.pid) 362 363 p.join() 364 365 self.assertEqual(p.exitcode, 0) 366 self.assertEqual(p.is_alive(), False) 367 self.assertNotIn(p, self.active_children()) 368 close_queue(q) 369 370 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 371 def test_process_mainthread_native_id(self): 372 if self.TYPE == 'threads': 373 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 374 375 current_mainthread_native_id = threading.main_thread().native_id 376 377 q = self.Queue(1) 378 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 379 p.start() 380 381 child_mainthread_native_id = q.get() 382 p.join() 383 close_queue(q) 384 385 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 386 387 @classmethod 388 def _test_process_mainthread_native_id(cls, q): 389 mainthread_native_id = threading.main_thread().native_id 390 q.put(mainthread_native_id) 391 392 @classmethod 393 def _sleep_some(cls): 394 time.sleep(100) 395 396 @classmethod 397 def _test_sleep(cls, delay): 398 time.sleep(delay) 399 400 def _kill_process(self, meth): 401 if self.TYPE == 'threads': 402 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 403 404 p = self.Process(target=self._sleep_some) 405 p.daemon = True 406 p.start() 407 408 self.assertEqual(p.is_alive(), True) 409 self.assertIn(p, self.active_children()) 410 self.assertEqual(p.exitcode, None) 411 412 join = TimingWrapper(p.join) 413 414 self.assertEqual(join(0), None) 415 self.assertTimingAlmostEqual(join.elapsed, 0.0) 416 self.assertEqual(p.is_alive(), True) 417 418 self.assertEqual(join(-1), None) 419 self.assertTimingAlmostEqual(join.elapsed, 0.0) 420 self.assertEqual(p.is_alive(), True) 421 422 # XXX maybe terminating too soon causes the problems on Gentoo... 423 time.sleep(1) 424 425 meth(p) 426 427 if hasattr(signal, 'alarm'): 428 # On the Gentoo buildbot waitpid() often seems to block forever. 429 # We use alarm() to interrupt it if it blocks for too long. 430 def handler(*args): 431 raise RuntimeError('join took too long: %s' % p) 432 old_handler = signal.signal(signal.SIGALRM, handler) 433 try: 434 signal.alarm(10) 435 self.assertEqual(join(), None) 436 finally: 437 signal.alarm(0) 438 signal.signal(signal.SIGALRM, old_handler) 439 else: 440 self.assertEqual(join(), None) 441 442 self.assertTimingAlmostEqual(join.elapsed, 0.0) 443 444 self.assertEqual(p.is_alive(), False) 445 self.assertNotIn(p, self.active_children()) 446 447 p.join() 448 449 return p.exitcode 450 451 def test_terminate(self): 452 exitcode = self._kill_process(multiprocessing.Process.terminate) 453 if os.name != 'nt': 454 self.assertEqual(exitcode, -signal.SIGTERM) 455 456 def test_kill(self): 457 exitcode = self._kill_process(multiprocessing.Process.kill) 458 if os.name != 'nt': 459 self.assertEqual(exitcode, -signal.SIGKILL) 460 461 def test_cpu_count(self): 462 try: 463 cpus = multiprocessing.cpu_count() 464 except NotImplementedError: 465 cpus = 1 466 self.assertTrue(type(cpus) is int) 467 self.assertTrue(cpus >= 1) 468 469 def test_active_children(self): 470 self.assertEqual(type(self.active_children()), list) 471 472 p = self.Process(target=time.sleep, args=(DELTA,)) 473 self.assertNotIn(p, self.active_children()) 474 475 p.daemon = True 476 p.start() 477 self.assertIn(p, self.active_children()) 478 479 p.join() 480 self.assertNotIn(p, self.active_children()) 481 482 @classmethod 483 def _test_recursion(cls, wconn, id): 484 wconn.send(id) 485 if len(id) < 2: 486 for i in range(2): 487 p = cls.Process( 488 target=cls._test_recursion, args=(wconn, id+[i]) 489 ) 490 p.start() 491 p.join() 492 493 def test_recursion(self): 494 rconn, wconn = self.Pipe(duplex=False) 495 self._test_recursion(wconn, []) 496 497 time.sleep(DELTA) 498 result = [] 499 while rconn.poll(): 500 result.append(rconn.recv()) 501 502 expected = [ 503 [], 504 [0], 505 [0, 0], 506 [0, 1], 507 [1], 508 [1, 0], 509 [1, 1] 510 ] 511 self.assertEqual(result, expected) 512 513 @classmethod 514 def _test_sentinel(cls, event): 515 event.wait(10.0) 516 517 def test_sentinel(self): 518 if self.TYPE == "threads": 519 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 520 event = self.Event() 521 p = self.Process(target=self._test_sentinel, args=(event,)) 522 with self.assertRaises(ValueError): 523 p.sentinel 524 p.start() 525 self.addCleanup(p.join) 526 sentinel = p.sentinel 527 self.assertIsInstance(sentinel, int) 528 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 529 event.set() 530 p.join() 531 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 532 533 @classmethod 534 def _test_close(cls, rc=0, q=None): 535 if q is not None: 536 q.get() 537 sys.exit(rc) 538 539 def test_close(self): 540 if self.TYPE == "threads": 541 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 542 q = self.Queue() 543 p = self.Process(target=self._test_close, kwargs={'q': q}) 544 p.daemon = True 545 p.start() 546 self.assertEqual(p.is_alive(), True) 547 # Child is still alive, cannot close 548 with self.assertRaises(ValueError): 549 p.close() 550 551 q.put(None) 552 p.join() 553 self.assertEqual(p.is_alive(), False) 554 self.assertEqual(p.exitcode, 0) 555 p.close() 556 with self.assertRaises(ValueError): 557 p.is_alive() 558 with self.assertRaises(ValueError): 559 p.join() 560 with self.assertRaises(ValueError): 561 p.terminate() 562 p.close() 563 564 wr = weakref.ref(p) 565 del p 566 gc.collect() 567 self.assertIs(wr(), None) 568 569 close_queue(q) 570 571 def test_many_processes(self): 572 if self.TYPE == 'threads': 573 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 574 575 sm = multiprocessing.get_start_method() 576 N = 5 if sm == 'spawn' else 100 577 578 # Try to overwhelm the forkserver loop with events 579 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 580 for i in range(N)] 581 for p in procs: 582 p.start() 583 for p in procs: 584 join_process(p) 585 for p in procs: 586 self.assertEqual(p.exitcode, 0) 587 588 procs = [self.Process(target=self._sleep_some) 589 for i in range(N)] 590 for p in procs: 591 p.start() 592 time.sleep(0.001) # let the children start... 593 for p in procs: 594 p.terminate() 595 for p in procs: 596 join_process(p) 597 if os.name != 'nt': 598 exitcodes = [-signal.SIGTERM] 599 if sys.platform == 'darwin': 600 # bpo-31510: On macOS, killing a freshly started process with 601 # SIGTERM sometimes kills the process with SIGKILL. 602 exitcodes.append(-signal.SIGKILL) 603 for p in procs: 604 self.assertIn(p.exitcode, exitcodes) 605 606 def test_lose_target_ref(self): 607 c = DummyCallable() 608 wr = weakref.ref(c) 609 q = self.Queue() 610 p = self.Process(target=c, args=(q, c)) 611 del c 612 p.start() 613 p.join() 614 self.assertIs(wr(), None) 615 self.assertEqual(q.get(), 5) 616 close_queue(q) 617 618 @classmethod 619 def _test_child_fd_inflation(self, evt, q): 620 q.put(test.support.fd_count()) 621 evt.wait() 622 623 def test_child_fd_inflation(self): 624 # Number of fds in child processes should not grow with the 625 # number of running children. 626 if self.TYPE == 'threads': 627 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 628 629 sm = multiprocessing.get_start_method() 630 if sm == 'fork': 631 # The fork method by design inherits all fds from the parent, 632 # trying to go against it is a lost battle 633 self.skipTest('test not appropriate for {}'.format(sm)) 634 635 N = 5 636 evt = self.Event() 637 q = self.Queue() 638 639 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 640 for i in range(N)] 641 for p in procs: 642 p.start() 643 644 try: 645 fd_counts = [q.get() for i in range(N)] 646 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 647 648 finally: 649 evt.set() 650 for p in procs: 651 p.join() 652 close_queue(q) 653 654 @classmethod 655 def _test_wait_for_threads(self, evt): 656 def func1(): 657 time.sleep(0.5) 658 evt.set() 659 660 def func2(): 661 time.sleep(20) 662 evt.clear() 663 664 threading.Thread(target=func1).start() 665 threading.Thread(target=func2, daemon=True).start() 666 667 def test_wait_for_threads(self): 668 # A child process should wait for non-daemonic threads to end 669 # before exiting 670 if self.TYPE == 'threads': 671 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 672 673 evt = self.Event() 674 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 675 proc.start() 676 proc.join() 677 self.assertTrue(evt.is_set()) 678 679 @classmethod 680 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 681 for stream_name, action in break_std_streams.items(): 682 if action == 'close': 683 stream = io.StringIO() 684 stream.close() 685 else: 686 assert action == 'remove' 687 stream = None 688 setattr(sys, stream_name, None) 689 evt.set() 690 691 def test_error_on_stdio_flush_1(self): 692 # Check that Process works with broken standard streams 693 streams = [io.StringIO(), None] 694 streams[0].close() 695 for stream_name in ('stdout', 'stderr'): 696 for stream in streams: 697 old_stream = getattr(sys, stream_name) 698 setattr(sys, stream_name, stream) 699 try: 700 evt = self.Event() 701 proc = self.Process(target=self._test_error_on_stdio_flush, 702 args=(evt,)) 703 proc.start() 704 proc.join() 705 self.assertTrue(evt.is_set()) 706 self.assertEqual(proc.exitcode, 0) 707 finally: 708 setattr(sys, stream_name, old_stream) 709 710 def test_error_on_stdio_flush_2(self): 711 # Same as test_error_on_stdio_flush_1(), but standard streams are 712 # broken by the child process 713 for stream_name in ('stdout', 'stderr'): 714 for action in ('close', 'remove'): 715 old_stream = getattr(sys, stream_name) 716 try: 717 evt = self.Event() 718 proc = self.Process(target=self._test_error_on_stdio_flush, 719 args=(evt, {stream_name: action})) 720 proc.start() 721 proc.join() 722 self.assertTrue(evt.is_set()) 723 self.assertEqual(proc.exitcode, 0) 724 finally: 725 setattr(sys, stream_name, old_stream) 726 727 @classmethod 728 def _sleep_and_set_event(self, evt, delay=0.0): 729 time.sleep(delay) 730 evt.set() 731 732 def check_forkserver_death(self, signum): 733 # bpo-31308: if the forkserver process has died, we should still 734 # be able to create and run new Process instances (the forkserver 735 # is implicitly restarted). 736 if self.TYPE == 'threads': 737 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 738 sm = multiprocessing.get_start_method() 739 if sm != 'forkserver': 740 # The fork method by design inherits all fds from the parent, 741 # trying to go against it is a lost battle 742 self.skipTest('test not appropriate for {}'.format(sm)) 743 744 from multiprocessing.forkserver import _forkserver 745 _forkserver.ensure_running() 746 747 # First process sleeps 500 ms 748 delay = 0.5 749 750 evt = self.Event() 751 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 752 proc.start() 753 754 pid = _forkserver._forkserver_pid 755 os.kill(pid, signum) 756 # give time to the fork server to die and time to proc to complete 757 time.sleep(delay * 2.0) 758 759 evt2 = self.Event() 760 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 761 proc2.start() 762 proc2.join() 763 self.assertTrue(evt2.is_set()) 764 self.assertEqual(proc2.exitcode, 0) 765 766 proc.join() 767 self.assertTrue(evt.is_set()) 768 self.assertIn(proc.exitcode, (0, 255)) 769 770 def test_forkserver_sigint(self): 771 # Catchable signal 772 self.check_forkserver_death(signal.SIGINT) 773 774 def test_forkserver_sigkill(self): 775 # Uncatchable signal 776 if os.name != 'nt': 777 self.check_forkserver_death(signal.SIGKILL) 778 779 780# 781# 782# 783 784class _UpperCaser(multiprocessing.Process): 785 786 def __init__(self): 787 multiprocessing.Process.__init__(self) 788 self.child_conn, self.parent_conn = multiprocessing.Pipe() 789 790 def run(self): 791 self.parent_conn.close() 792 for s in iter(self.child_conn.recv, None): 793 self.child_conn.send(s.upper()) 794 self.child_conn.close() 795 796 def submit(self, s): 797 assert type(s) is str 798 self.parent_conn.send(s) 799 return self.parent_conn.recv() 800 801 def stop(self): 802 self.parent_conn.send(None) 803 self.parent_conn.close() 804 self.child_conn.close() 805 806class _TestSubclassingProcess(BaseTestCase): 807 808 ALLOWED_TYPES = ('processes',) 809 810 def test_subclassing(self): 811 uppercaser = _UpperCaser() 812 uppercaser.daemon = True 813 uppercaser.start() 814 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 815 self.assertEqual(uppercaser.submit('world'), 'WORLD') 816 uppercaser.stop() 817 uppercaser.join() 818 819 def test_stderr_flush(self): 820 # sys.stderr is flushed at process shutdown (issue #13812) 821 if self.TYPE == "threads": 822 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 823 824 testfn = test.support.TESTFN 825 self.addCleanup(test.support.unlink, testfn) 826 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 827 proc.start() 828 proc.join() 829 with open(testfn, 'r') as f: 830 err = f.read() 831 # The whole traceback was printed 832 self.assertIn("ZeroDivisionError", err) 833 self.assertIn("test_multiprocessing.py", err) 834 self.assertIn("1/0 # MARKER", err) 835 836 @classmethod 837 def _test_stderr_flush(cls, testfn): 838 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 839 sys.stderr = open(fd, 'w', closefd=False) 840 1/0 # MARKER 841 842 843 @classmethod 844 def _test_sys_exit(cls, reason, testfn): 845 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 846 sys.stderr = open(fd, 'w', closefd=False) 847 sys.exit(reason) 848 849 def test_sys_exit(self): 850 # See Issue 13854 851 if self.TYPE == 'threads': 852 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 853 854 testfn = test.support.TESTFN 855 self.addCleanup(test.support.unlink, testfn) 856 857 for reason in ( 858 [1, 2, 3], 859 'ignore this', 860 ): 861 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 862 p.daemon = True 863 p.start() 864 join_process(p) 865 self.assertEqual(p.exitcode, 1) 866 867 with open(testfn, 'r') as f: 868 content = f.read() 869 self.assertEqual(content.rstrip(), str(reason)) 870 871 os.unlink(testfn) 872 873 for reason in (True, False, 8): 874 p = self.Process(target=sys.exit, args=(reason,)) 875 p.daemon = True 876 p.start() 877 join_process(p) 878 self.assertEqual(p.exitcode, reason) 879 880# 881# 882# 883 884def queue_empty(q): 885 if hasattr(q, 'empty'): 886 return q.empty() 887 else: 888 return q.qsize() == 0 889 890def queue_full(q, maxsize): 891 if hasattr(q, 'full'): 892 return q.full() 893 else: 894 return q.qsize() == maxsize 895 896 897class _TestQueue(BaseTestCase): 898 899 900 @classmethod 901 def _test_put(cls, queue, child_can_start, parent_can_continue): 902 child_can_start.wait() 903 for i in range(6): 904 queue.get() 905 parent_can_continue.set() 906 907 def test_put(self): 908 MAXSIZE = 6 909 queue = self.Queue(maxsize=MAXSIZE) 910 child_can_start = self.Event() 911 parent_can_continue = self.Event() 912 913 proc = self.Process( 914 target=self._test_put, 915 args=(queue, child_can_start, parent_can_continue) 916 ) 917 proc.daemon = True 918 proc.start() 919 920 self.assertEqual(queue_empty(queue), True) 921 self.assertEqual(queue_full(queue, MAXSIZE), False) 922 923 queue.put(1) 924 queue.put(2, True) 925 queue.put(3, True, None) 926 queue.put(4, False) 927 queue.put(5, False, None) 928 queue.put_nowait(6) 929 930 # the values may be in buffer but not yet in pipe so sleep a bit 931 time.sleep(DELTA) 932 933 self.assertEqual(queue_empty(queue), False) 934 self.assertEqual(queue_full(queue, MAXSIZE), True) 935 936 put = TimingWrapper(queue.put) 937 put_nowait = TimingWrapper(queue.put_nowait) 938 939 self.assertRaises(pyqueue.Full, put, 7, False) 940 self.assertTimingAlmostEqual(put.elapsed, 0) 941 942 self.assertRaises(pyqueue.Full, put, 7, False, None) 943 self.assertTimingAlmostEqual(put.elapsed, 0) 944 945 self.assertRaises(pyqueue.Full, put_nowait, 7) 946 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 947 948 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 949 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 950 951 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 952 self.assertTimingAlmostEqual(put.elapsed, 0) 953 954 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 955 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 956 957 child_can_start.set() 958 parent_can_continue.wait() 959 960 self.assertEqual(queue_empty(queue), True) 961 self.assertEqual(queue_full(queue, MAXSIZE), False) 962 963 proc.join() 964 close_queue(queue) 965 966 @classmethod 967 def _test_get(cls, queue, child_can_start, parent_can_continue): 968 child_can_start.wait() 969 #queue.put(1) 970 queue.put(2) 971 queue.put(3) 972 queue.put(4) 973 queue.put(5) 974 parent_can_continue.set() 975 976 def test_get(self): 977 queue = self.Queue() 978 child_can_start = self.Event() 979 parent_can_continue = self.Event() 980 981 proc = self.Process( 982 target=self._test_get, 983 args=(queue, child_can_start, parent_can_continue) 984 ) 985 proc.daemon = True 986 proc.start() 987 988 self.assertEqual(queue_empty(queue), True) 989 990 child_can_start.set() 991 parent_can_continue.wait() 992 993 time.sleep(DELTA) 994 self.assertEqual(queue_empty(queue), False) 995 996 # Hangs unexpectedly, remove for now 997 #self.assertEqual(queue.get(), 1) 998 self.assertEqual(queue.get(True, None), 2) 999 self.assertEqual(queue.get(True), 3) 1000 self.assertEqual(queue.get(timeout=1), 4) 1001 self.assertEqual(queue.get_nowait(), 5) 1002 1003 self.assertEqual(queue_empty(queue), True) 1004 1005 get = TimingWrapper(queue.get) 1006 get_nowait = TimingWrapper(queue.get_nowait) 1007 1008 self.assertRaises(pyqueue.Empty, get, False) 1009 self.assertTimingAlmostEqual(get.elapsed, 0) 1010 1011 self.assertRaises(pyqueue.Empty, get, False, None) 1012 self.assertTimingAlmostEqual(get.elapsed, 0) 1013 1014 self.assertRaises(pyqueue.Empty, get_nowait) 1015 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1016 1017 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1018 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1019 1020 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1021 self.assertTimingAlmostEqual(get.elapsed, 0) 1022 1023 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1024 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1025 1026 proc.join() 1027 close_queue(queue) 1028 1029 @classmethod 1030 def _test_fork(cls, queue): 1031 for i in range(10, 20): 1032 queue.put(i) 1033 # note that at this point the items may only be buffered, so the 1034 # process cannot shutdown until the feeder thread has finished 1035 # pushing items onto the pipe. 1036 1037 def test_fork(self): 1038 # Old versions of Queue would fail to create a new feeder 1039 # thread for a forked process if the original process had its 1040 # own feeder thread. This test checks that this no longer 1041 # happens. 1042 1043 queue = self.Queue() 1044 1045 # put items on queue so that main process starts a feeder thread 1046 for i in range(10): 1047 queue.put(i) 1048 1049 # wait to make sure thread starts before we fork a new process 1050 time.sleep(DELTA) 1051 1052 # fork process 1053 p = self.Process(target=self._test_fork, args=(queue,)) 1054 p.daemon = True 1055 p.start() 1056 1057 # check that all expected items are in the queue 1058 for i in range(20): 1059 self.assertEqual(queue.get(), i) 1060 self.assertRaises(pyqueue.Empty, queue.get, False) 1061 1062 p.join() 1063 close_queue(queue) 1064 1065 def test_qsize(self): 1066 q = self.Queue() 1067 try: 1068 self.assertEqual(q.qsize(), 0) 1069 except NotImplementedError: 1070 self.skipTest('qsize method not implemented') 1071 q.put(1) 1072 self.assertEqual(q.qsize(), 1) 1073 q.put(5) 1074 self.assertEqual(q.qsize(), 2) 1075 q.get() 1076 self.assertEqual(q.qsize(), 1) 1077 q.get() 1078 self.assertEqual(q.qsize(), 0) 1079 close_queue(q) 1080 1081 @classmethod 1082 def _test_task_done(cls, q): 1083 for obj in iter(q.get, None): 1084 time.sleep(DELTA) 1085 q.task_done() 1086 1087 def test_task_done(self): 1088 queue = self.JoinableQueue() 1089 1090 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1091 for i in range(4)] 1092 1093 for p in workers: 1094 p.daemon = True 1095 p.start() 1096 1097 for i in range(10): 1098 queue.put(i) 1099 1100 queue.join() 1101 1102 for p in workers: 1103 queue.put(None) 1104 1105 for p in workers: 1106 p.join() 1107 close_queue(queue) 1108 1109 def test_no_import_lock_contention(self): 1110 with test.support.temp_cwd(): 1111 module_name = 'imported_by_an_imported_module' 1112 with open(module_name + '.py', 'w') as f: 1113 f.write("""if 1: 1114 import multiprocessing 1115 1116 q = multiprocessing.Queue() 1117 q.put('knock knock') 1118 q.get(timeout=3) 1119 q.close() 1120 del q 1121 """) 1122 1123 with test.support.DirsOnSysPath(os.getcwd()): 1124 try: 1125 __import__(module_name) 1126 except pyqueue.Empty: 1127 self.fail("Probable regression on import lock contention;" 1128 " see Issue #22853") 1129 1130 def test_timeout(self): 1131 q = multiprocessing.Queue() 1132 start = time.monotonic() 1133 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1134 delta = time.monotonic() - start 1135 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1136 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1137 # failed because the delta was only 135.8 ms. 1138 self.assertGreaterEqual(delta, 0.100) 1139 close_queue(q) 1140 1141 def test_queue_feeder_donot_stop_onexc(self): 1142 # bpo-30414: verify feeder handles exceptions correctly 1143 if self.TYPE != 'processes': 1144 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1145 1146 class NotSerializable(object): 1147 def __reduce__(self): 1148 raise AttributeError 1149 with test.support.captured_stderr(): 1150 q = self.Queue() 1151 q.put(NotSerializable()) 1152 q.put(True) 1153 self.assertTrue(q.get(timeout=TIMEOUT)) 1154 close_queue(q) 1155 1156 with test.support.captured_stderr(): 1157 # bpo-33078: verify that the queue size is correctly handled 1158 # on errors. 1159 q = self.Queue(maxsize=1) 1160 q.put(NotSerializable()) 1161 q.put(True) 1162 try: 1163 self.assertEqual(q.qsize(), 1) 1164 except NotImplementedError: 1165 # qsize is not available on all platform as it 1166 # relies on sem_getvalue 1167 pass 1168 # bpo-30595: use a timeout of 1 second for slow buildbots 1169 self.assertTrue(q.get(timeout=1.0)) 1170 # Check that the size of the queue is correct 1171 self.assertTrue(q.empty()) 1172 close_queue(q) 1173 1174 def test_queue_feeder_on_queue_feeder_error(self): 1175 # bpo-30006: verify feeder handles exceptions using the 1176 # _on_queue_feeder_error hook. 1177 if self.TYPE != 'processes': 1178 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1179 1180 class NotSerializable(object): 1181 """Mock unserializable object""" 1182 def __init__(self): 1183 self.reduce_was_called = False 1184 self.on_queue_feeder_error_was_called = False 1185 1186 def __reduce__(self): 1187 self.reduce_was_called = True 1188 raise AttributeError 1189 1190 class SafeQueue(multiprocessing.queues.Queue): 1191 """Queue with overloaded _on_queue_feeder_error hook""" 1192 @staticmethod 1193 def _on_queue_feeder_error(e, obj): 1194 if (isinstance(e, AttributeError) and 1195 isinstance(obj, NotSerializable)): 1196 obj.on_queue_feeder_error_was_called = True 1197 1198 not_serializable_obj = NotSerializable() 1199 # The captured_stderr reduces the noise in the test report 1200 with test.support.captured_stderr(): 1201 q = SafeQueue(ctx=multiprocessing.get_context()) 1202 q.put(not_serializable_obj) 1203 1204 # Verify that q is still functioning correctly 1205 q.put(True) 1206 self.assertTrue(q.get(timeout=1.0)) 1207 1208 # Assert that the serialization and the hook have been called correctly 1209 self.assertTrue(not_serializable_obj.reduce_was_called) 1210 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1211 1212 def test_closed_queue_put_get_exceptions(self): 1213 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1214 q.close() 1215 with self.assertRaisesRegex(ValueError, 'is closed'): 1216 q.put('foo') 1217 with self.assertRaisesRegex(ValueError, 'is closed'): 1218 q.get() 1219# 1220# 1221# 1222 1223class _TestLock(BaseTestCase): 1224 1225 def test_lock(self): 1226 lock = self.Lock() 1227 self.assertEqual(lock.acquire(), True) 1228 self.assertEqual(lock.acquire(False), False) 1229 self.assertEqual(lock.release(), None) 1230 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1231 1232 def test_rlock(self): 1233 lock = self.RLock() 1234 self.assertEqual(lock.acquire(), True) 1235 self.assertEqual(lock.acquire(), True) 1236 self.assertEqual(lock.acquire(), True) 1237 self.assertEqual(lock.release(), None) 1238 self.assertEqual(lock.release(), None) 1239 self.assertEqual(lock.release(), None) 1240 self.assertRaises((AssertionError, RuntimeError), lock.release) 1241 1242 def test_lock_context(self): 1243 with self.Lock(): 1244 pass 1245 1246 1247class _TestSemaphore(BaseTestCase): 1248 1249 def _test_semaphore(self, sem): 1250 self.assertReturnsIfImplemented(2, get_value, sem) 1251 self.assertEqual(sem.acquire(), True) 1252 self.assertReturnsIfImplemented(1, get_value, sem) 1253 self.assertEqual(sem.acquire(), True) 1254 self.assertReturnsIfImplemented(0, get_value, sem) 1255 self.assertEqual(sem.acquire(False), False) 1256 self.assertReturnsIfImplemented(0, get_value, sem) 1257 self.assertEqual(sem.release(), None) 1258 self.assertReturnsIfImplemented(1, get_value, sem) 1259 self.assertEqual(sem.release(), None) 1260 self.assertReturnsIfImplemented(2, get_value, sem) 1261 1262 def test_semaphore(self): 1263 sem = self.Semaphore(2) 1264 self._test_semaphore(sem) 1265 self.assertEqual(sem.release(), None) 1266 self.assertReturnsIfImplemented(3, get_value, sem) 1267 self.assertEqual(sem.release(), None) 1268 self.assertReturnsIfImplemented(4, get_value, sem) 1269 1270 def test_bounded_semaphore(self): 1271 sem = self.BoundedSemaphore(2) 1272 self._test_semaphore(sem) 1273 # Currently fails on OS/X 1274 #if HAVE_GETVALUE: 1275 # self.assertRaises(ValueError, sem.release) 1276 # self.assertReturnsIfImplemented(2, get_value, sem) 1277 1278 def test_timeout(self): 1279 if self.TYPE != 'processes': 1280 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1281 1282 sem = self.Semaphore(0) 1283 acquire = TimingWrapper(sem.acquire) 1284 1285 self.assertEqual(acquire(False), False) 1286 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1287 1288 self.assertEqual(acquire(False, None), False) 1289 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1290 1291 self.assertEqual(acquire(False, TIMEOUT1), False) 1292 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1293 1294 self.assertEqual(acquire(True, TIMEOUT2), False) 1295 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1296 1297 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1298 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1299 1300 1301class _TestCondition(BaseTestCase): 1302 1303 @classmethod 1304 def f(cls, cond, sleeping, woken, timeout=None): 1305 cond.acquire() 1306 sleeping.release() 1307 cond.wait(timeout) 1308 woken.release() 1309 cond.release() 1310 1311 def assertReachesEventually(self, func, value): 1312 for i in range(10): 1313 try: 1314 if func() == value: 1315 break 1316 except NotImplementedError: 1317 break 1318 time.sleep(DELTA) 1319 time.sleep(DELTA) 1320 self.assertReturnsIfImplemented(value, func) 1321 1322 def check_invariant(self, cond): 1323 # this is only supposed to succeed when there are no sleepers 1324 if self.TYPE == 'processes': 1325 try: 1326 sleepers = (cond._sleeping_count.get_value() - 1327 cond._woken_count.get_value()) 1328 self.assertEqual(sleepers, 0) 1329 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1330 except NotImplementedError: 1331 pass 1332 1333 def test_notify(self): 1334 cond = self.Condition() 1335 sleeping = self.Semaphore(0) 1336 woken = self.Semaphore(0) 1337 1338 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1339 p.daemon = True 1340 p.start() 1341 self.addCleanup(p.join) 1342 1343 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1344 p.daemon = True 1345 p.start() 1346 self.addCleanup(p.join) 1347 1348 # wait for both children to start sleeping 1349 sleeping.acquire() 1350 sleeping.acquire() 1351 1352 # check no process/thread has woken up 1353 time.sleep(DELTA) 1354 self.assertReturnsIfImplemented(0, get_value, woken) 1355 1356 # wake up one process/thread 1357 cond.acquire() 1358 cond.notify() 1359 cond.release() 1360 1361 # check one process/thread has woken up 1362 time.sleep(DELTA) 1363 self.assertReturnsIfImplemented(1, get_value, woken) 1364 1365 # wake up another 1366 cond.acquire() 1367 cond.notify() 1368 cond.release() 1369 1370 # check other has woken up 1371 time.sleep(DELTA) 1372 self.assertReturnsIfImplemented(2, get_value, woken) 1373 1374 # check state is not mucked up 1375 self.check_invariant(cond) 1376 p.join() 1377 1378 def test_notify_all(self): 1379 cond = self.Condition() 1380 sleeping = self.Semaphore(0) 1381 woken = self.Semaphore(0) 1382 1383 # start some threads/processes which will timeout 1384 for i in range(3): 1385 p = self.Process(target=self.f, 1386 args=(cond, sleeping, woken, TIMEOUT1)) 1387 p.daemon = True 1388 p.start() 1389 self.addCleanup(p.join) 1390 1391 t = threading.Thread(target=self.f, 1392 args=(cond, sleeping, woken, TIMEOUT1)) 1393 t.daemon = True 1394 t.start() 1395 self.addCleanup(t.join) 1396 1397 # wait for them all to sleep 1398 for i in range(6): 1399 sleeping.acquire() 1400 1401 # check they have all timed out 1402 for i in range(6): 1403 woken.acquire() 1404 self.assertReturnsIfImplemented(0, get_value, woken) 1405 1406 # check state is not mucked up 1407 self.check_invariant(cond) 1408 1409 # start some more threads/processes 1410 for i in range(3): 1411 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1412 p.daemon = True 1413 p.start() 1414 self.addCleanup(p.join) 1415 1416 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1417 t.daemon = True 1418 t.start() 1419 self.addCleanup(t.join) 1420 1421 # wait for them to all sleep 1422 for i in range(6): 1423 sleeping.acquire() 1424 1425 # check no process/thread has woken up 1426 time.sleep(DELTA) 1427 self.assertReturnsIfImplemented(0, get_value, woken) 1428 1429 # wake them all up 1430 cond.acquire() 1431 cond.notify_all() 1432 cond.release() 1433 1434 # check they have all woken 1435 self.assertReachesEventually(lambda: get_value(woken), 6) 1436 1437 # check state is not mucked up 1438 self.check_invariant(cond) 1439 1440 def test_notify_n(self): 1441 cond = self.Condition() 1442 sleeping = self.Semaphore(0) 1443 woken = self.Semaphore(0) 1444 1445 # start some threads/processes 1446 for i in range(3): 1447 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1448 p.daemon = True 1449 p.start() 1450 self.addCleanup(p.join) 1451 1452 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1453 t.daemon = True 1454 t.start() 1455 self.addCleanup(t.join) 1456 1457 # wait for them to all sleep 1458 for i in range(6): 1459 sleeping.acquire() 1460 1461 # check no process/thread has woken up 1462 time.sleep(DELTA) 1463 self.assertReturnsIfImplemented(0, get_value, woken) 1464 1465 # wake some of them up 1466 cond.acquire() 1467 cond.notify(n=2) 1468 cond.release() 1469 1470 # check 2 have woken 1471 self.assertReachesEventually(lambda: get_value(woken), 2) 1472 1473 # wake the rest of them 1474 cond.acquire() 1475 cond.notify(n=4) 1476 cond.release() 1477 1478 self.assertReachesEventually(lambda: get_value(woken), 6) 1479 1480 # doesn't do anything more 1481 cond.acquire() 1482 cond.notify(n=3) 1483 cond.release() 1484 1485 self.assertReturnsIfImplemented(6, get_value, woken) 1486 1487 # check state is not mucked up 1488 self.check_invariant(cond) 1489 1490 def test_timeout(self): 1491 cond = self.Condition() 1492 wait = TimingWrapper(cond.wait) 1493 cond.acquire() 1494 res = wait(TIMEOUT1) 1495 cond.release() 1496 self.assertEqual(res, False) 1497 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1498 1499 @classmethod 1500 def _test_waitfor_f(cls, cond, state): 1501 with cond: 1502 state.value = 0 1503 cond.notify() 1504 result = cond.wait_for(lambda : state.value==4) 1505 if not result or state.value != 4: 1506 sys.exit(1) 1507 1508 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1509 def test_waitfor(self): 1510 # based on test in test/lock_tests.py 1511 cond = self.Condition() 1512 state = self.Value('i', -1) 1513 1514 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1515 p.daemon = True 1516 p.start() 1517 1518 with cond: 1519 result = cond.wait_for(lambda : state.value==0) 1520 self.assertTrue(result) 1521 self.assertEqual(state.value, 0) 1522 1523 for i in range(4): 1524 time.sleep(0.01) 1525 with cond: 1526 state.value += 1 1527 cond.notify() 1528 1529 join_process(p) 1530 self.assertEqual(p.exitcode, 0) 1531 1532 @classmethod 1533 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1534 sem.release() 1535 with cond: 1536 expected = 0.1 1537 dt = time.monotonic() 1538 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1539 dt = time.monotonic() - dt 1540 # borrow logic in assertTimeout() from test/lock_tests.py 1541 if not result and expected * 0.6 < dt < expected * 10.0: 1542 success.value = True 1543 1544 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1545 def test_waitfor_timeout(self): 1546 # based on test in test/lock_tests.py 1547 cond = self.Condition() 1548 state = self.Value('i', 0) 1549 success = self.Value('i', False) 1550 sem = self.Semaphore(0) 1551 1552 p = self.Process(target=self._test_waitfor_timeout_f, 1553 args=(cond, state, success, sem)) 1554 p.daemon = True 1555 p.start() 1556 self.assertTrue(sem.acquire(timeout=TIMEOUT)) 1557 1558 # Only increment 3 times, so state == 4 is never reached. 1559 for i in range(3): 1560 time.sleep(0.01) 1561 with cond: 1562 state.value += 1 1563 cond.notify() 1564 1565 join_process(p) 1566 self.assertTrue(success.value) 1567 1568 @classmethod 1569 def _test_wait_result(cls, c, pid): 1570 with c: 1571 c.notify() 1572 time.sleep(1) 1573 if pid is not None: 1574 os.kill(pid, signal.SIGINT) 1575 1576 def test_wait_result(self): 1577 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1578 pid = os.getpid() 1579 else: 1580 pid = None 1581 1582 c = self.Condition() 1583 with c: 1584 self.assertFalse(c.wait(0)) 1585 self.assertFalse(c.wait(0.1)) 1586 1587 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1588 p.start() 1589 1590 self.assertTrue(c.wait(60)) 1591 if pid is not None: 1592 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1593 1594 p.join() 1595 1596 1597class _TestEvent(BaseTestCase): 1598 1599 @classmethod 1600 def _test_event(cls, event): 1601 time.sleep(TIMEOUT2) 1602 event.set() 1603 1604 def test_event(self): 1605 event = self.Event() 1606 wait = TimingWrapper(event.wait) 1607 1608 # Removed temporarily, due to API shear, this does not 1609 # work with threading._Event objects. is_set == isSet 1610 self.assertEqual(event.is_set(), False) 1611 1612 # Removed, threading.Event.wait() will return the value of the __flag 1613 # instead of None. API Shear with the semaphore backed mp.Event 1614 self.assertEqual(wait(0.0), False) 1615 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1616 self.assertEqual(wait(TIMEOUT1), False) 1617 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1618 1619 event.set() 1620 1621 # See note above on the API differences 1622 self.assertEqual(event.is_set(), True) 1623 self.assertEqual(wait(), True) 1624 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1625 self.assertEqual(wait(TIMEOUT1), True) 1626 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1627 # self.assertEqual(event.is_set(), True) 1628 1629 event.clear() 1630 1631 #self.assertEqual(event.is_set(), False) 1632 1633 p = self.Process(target=self._test_event, args=(event,)) 1634 p.daemon = True 1635 p.start() 1636 self.assertEqual(wait(), True) 1637 p.join() 1638 1639# 1640# Tests for Barrier - adapted from tests in test/lock_tests.py 1641# 1642 1643# Many of the tests for threading.Barrier use a list as an atomic 1644# counter: a value is appended to increment the counter, and the 1645# length of the list gives the value. We use the class DummyList 1646# for the same purpose. 1647 1648class _DummyList(object): 1649 1650 def __init__(self): 1651 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1652 lock = multiprocessing.Lock() 1653 self.__setstate__((wrapper, lock)) 1654 self._lengthbuf[0] = 0 1655 1656 def __setstate__(self, state): 1657 (self._wrapper, self._lock) = state 1658 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1659 1660 def __getstate__(self): 1661 return (self._wrapper, self._lock) 1662 1663 def append(self, _): 1664 with self._lock: 1665 self._lengthbuf[0] += 1 1666 1667 def __len__(self): 1668 with self._lock: 1669 return self._lengthbuf[0] 1670 1671def _wait(): 1672 # A crude wait/yield function not relying on synchronization primitives. 1673 time.sleep(0.01) 1674 1675 1676class Bunch(object): 1677 """ 1678 A bunch of threads. 1679 """ 1680 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1681 """ 1682 Construct a bunch of `n` threads running the same function `f`. 1683 If `wait_before_exit` is True, the threads won't terminate until 1684 do_finish() is called. 1685 """ 1686 self.f = f 1687 self.args = args 1688 self.n = n 1689 self.started = namespace.DummyList() 1690 self.finished = namespace.DummyList() 1691 self._can_exit = namespace.Event() 1692 if not wait_before_exit: 1693 self._can_exit.set() 1694 1695 threads = [] 1696 for i in range(n): 1697 p = namespace.Process(target=self.task) 1698 p.daemon = True 1699 p.start() 1700 threads.append(p) 1701 1702 def finalize(threads): 1703 for p in threads: 1704 p.join() 1705 1706 self._finalizer = weakref.finalize(self, finalize, threads) 1707 1708 def task(self): 1709 pid = os.getpid() 1710 self.started.append(pid) 1711 try: 1712 self.f(*self.args) 1713 finally: 1714 self.finished.append(pid) 1715 self._can_exit.wait(30) 1716 assert self._can_exit.is_set() 1717 1718 def wait_for_started(self): 1719 while len(self.started) < self.n: 1720 _wait() 1721 1722 def wait_for_finished(self): 1723 while len(self.finished) < self.n: 1724 _wait() 1725 1726 def do_finish(self): 1727 self._can_exit.set() 1728 1729 def close(self): 1730 self._finalizer() 1731 1732 1733class AppendTrue(object): 1734 def __init__(self, obj): 1735 self.obj = obj 1736 def __call__(self): 1737 self.obj.append(True) 1738 1739 1740class _TestBarrier(BaseTestCase): 1741 """ 1742 Tests for Barrier objects. 1743 """ 1744 N = 5 1745 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1746 1747 def setUp(self): 1748 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1749 1750 def tearDown(self): 1751 self.barrier.abort() 1752 self.barrier = None 1753 1754 def DummyList(self): 1755 if self.TYPE == 'threads': 1756 return [] 1757 elif self.TYPE == 'manager': 1758 return self.manager.list() 1759 else: 1760 return _DummyList() 1761 1762 def run_threads(self, f, args): 1763 b = Bunch(self, f, args, self.N-1) 1764 try: 1765 f(*args) 1766 b.wait_for_finished() 1767 finally: 1768 b.close() 1769 1770 @classmethod 1771 def multipass(cls, barrier, results, n): 1772 m = barrier.parties 1773 assert m == cls.N 1774 for i in range(n): 1775 results[0].append(True) 1776 assert len(results[1]) == i * m 1777 barrier.wait() 1778 results[1].append(True) 1779 assert len(results[0]) == (i + 1) * m 1780 barrier.wait() 1781 try: 1782 assert barrier.n_waiting == 0 1783 except NotImplementedError: 1784 pass 1785 assert not barrier.broken 1786 1787 def test_barrier(self, passes=1): 1788 """ 1789 Test that a barrier is passed in lockstep 1790 """ 1791 results = [self.DummyList(), self.DummyList()] 1792 self.run_threads(self.multipass, (self.barrier, results, passes)) 1793 1794 def test_barrier_10(self): 1795 """ 1796 Test that a barrier works for 10 consecutive runs 1797 """ 1798 return self.test_barrier(10) 1799 1800 @classmethod 1801 def _test_wait_return_f(cls, barrier, queue): 1802 res = barrier.wait() 1803 queue.put(res) 1804 1805 def test_wait_return(self): 1806 """ 1807 test the return value from barrier.wait 1808 """ 1809 queue = self.Queue() 1810 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1811 results = [queue.get() for i in range(self.N)] 1812 self.assertEqual(results.count(0), 1) 1813 close_queue(queue) 1814 1815 @classmethod 1816 def _test_action_f(cls, barrier, results): 1817 barrier.wait() 1818 if len(results) != 1: 1819 raise RuntimeError 1820 1821 def test_action(self): 1822 """ 1823 Test the 'action' callback 1824 """ 1825 results = self.DummyList() 1826 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1827 self.run_threads(self._test_action_f, (barrier, results)) 1828 self.assertEqual(len(results), 1) 1829 1830 @classmethod 1831 def _test_abort_f(cls, barrier, results1, results2): 1832 try: 1833 i = barrier.wait() 1834 if i == cls.N//2: 1835 raise RuntimeError 1836 barrier.wait() 1837 results1.append(True) 1838 except threading.BrokenBarrierError: 1839 results2.append(True) 1840 except RuntimeError: 1841 barrier.abort() 1842 1843 def test_abort(self): 1844 """ 1845 Test that an abort will put the barrier in a broken state 1846 """ 1847 results1 = self.DummyList() 1848 results2 = self.DummyList() 1849 self.run_threads(self._test_abort_f, 1850 (self.barrier, results1, results2)) 1851 self.assertEqual(len(results1), 0) 1852 self.assertEqual(len(results2), self.N-1) 1853 self.assertTrue(self.barrier.broken) 1854 1855 @classmethod 1856 def _test_reset_f(cls, barrier, results1, results2, results3): 1857 i = barrier.wait() 1858 if i == cls.N//2: 1859 # Wait until the other threads are all in the barrier. 1860 while barrier.n_waiting < cls.N-1: 1861 time.sleep(0.001) 1862 barrier.reset() 1863 else: 1864 try: 1865 barrier.wait() 1866 results1.append(True) 1867 except threading.BrokenBarrierError: 1868 results2.append(True) 1869 # Now, pass the barrier again 1870 barrier.wait() 1871 results3.append(True) 1872 1873 def test_reset(self): 1874 """ 1875 Test that a 'reset' on a barrier frees the waiting threads 1876 """ 1877 results1 = self.DummyList() 1878 results2 = self.DummyList() 1879 results3 = self.DummyList() 1880 self.run_threads(self._test_reset_f, 1881 (self.barrier, results1, results2, results3)) 1882 self.assertEqual(len(results1), 0) 1883 self.assertEqual(len(results2), self.N-1) 1884 self.assertEqual(len(results3), self.N) 1885 1886 @classmethod 1887 def _test_abort_and_reset_f(cls, barrier, barrier2, 1888 results1, results2, results3): 1889 try: 1890 i = barrier.wait() 1891 if i == cls.N//2: 1892 raise RuntimeError 1893 barrier.wait() 1894 results1.append(True) 1895 except threading.BrokenBarrierError: 1896 results2.append(True) 1897 except RuntimeError: 1898 barrier.abort() 1899 # Synchronize and reset the barrier. Must synchronize first so 1900 # that everyone has left it when we reset, and after so that no 1901 # one enters it before the reset. 1902 if barrier2.wait() == cls.N//2: 1903 barrier.reset() 1904 barrier2.wait() 1905 barrier.wait() 1906 results3.append(True) 1907 1908 def test_abort_and_reset(self): 1909 """ 1910 Test that a barrier can be reset after being broken. 1911 """ 1912 results1 = self.DummyList() 1913 results2 = self.DummyList() 1914 results3 = self.DummyList() 1915 barrier2 = self.Barrier(self.N) 1916 1917 self.run_threads(self._test_abort_and_reset_f, 1918 (self.barrier, barrier2, results1, results2, results3)) 1919 self.assertEqual(len(results1), 0) 1920 self.assertEqual(len(results2), self.N-1) 1921 self.assertEqual(len(results3), self.N) 1922 1923 @classmethod 1924 def _test_timeout_f(cls, barrier, results): 1925 i = barrier.wait() 1926 if i == cls.N//2: 1927 # One thread is late! 1928 time.sleep(1.0) 1929 try: 1930 barrier.wait(0.5) 1931 except threading.BrokenBarrierError: 1932 results.append(True) 1933 1934 def test_timeout(self): 1935 """ 1936 Test wait(timeout) 1937 """ 1938 results = self.DummyList() 1939 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1940 self.assertEqual(len(results), self.barrier.parties) 1941 1942 @classmethod 1943 def _test_default_timeout_f(cls, barrier, results): 1944 i = barrier.wait(cls.defaultTimeout) 1945 if i == cls.N//2: 1946 # One thread is later than the default timeout 1947 time.sleep(1.0) 1948 try: 1949 barrier.wait() 1950 except threading.BrokenBarrierError: 1951 results.append(True) 1952 1953 def test_default_timeout(self): 1954 """ 1955 Test the barrier's default timeout 1956 """ 1957 barrier = self.Barrier(self.N, timeout=0.5) 1958 results = self.DummyList() 1959 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1960 self.assertEqual(len(results), barrier.parties) 1961 1962 def test_single_thread(self): 1963 b = self.Barrier(1) 1964 b.wait() 1965 b.wait() 1966 1967 @classmethod 1968 def _test_thousand_f(cls, barrier, passes, conn, lock): 1969 for i in range(passes): 1970 barrier.wait() 1971 with lock: 1972 conn.send(i) 1973 1974 def test_thousand(self): 1975 if self.TYPE == 'manager': 1976 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1977 passes = 1000 1978 lock = self.Lock() 1979 conn, child_conn = self.Pipe(False) 1980 for j in range(self.N): 1981 p = self.Process(target=self._test_thousand_f, 1982 args=(self.barrier, passes, child_conn, lock)) 1983 p.start() 1984 self.addCleanup(p.join) 1985 1986 for i in range(passes): 1987 for j in range(self.N): 1988 self.assertEqual(conn.recv(), i) 1989 1990# 1991# 1992# 1993 1994class _TestValue(BaseTestCase): 1995 1996 ALLOWED_TYPES = ('processes',) 1997 1998 codes_values = [ 1999 ('i', 4343, 24234), 2000 ('d', 3.625, -4.25), 2001 ('h', -232, 234), 2002 ('q', 2 ** 33, 2 ** 34), 2003 ('c', latin('x'), latin('y')) 2004 ] 2005 2006 def setUp(self): 2007 if not HAS_SHAREDCTYPES: 2008 self.skipTest("requires multiprocessing.sharedctypes") 2009 2010 @classmethod 2011 def _test(cls, values): 2012 for sv, cv in zip(values, cls.codes_values): 2013 sv.value = cv[2] 2014 2015 2016 def test_value(self, raw=False): 2017 if raw: 2018 values = [self.RawValue(code, value) 2019 for code, value, _ in self.codes_values] 2020 else: 2021 values = [self.Value(code, value) 2022 for code, value, _ in self.codes_values] 2023 2024 for sv, cv in zip(values, self.codes_values): 2025 self.assertEqual(sv.value, cv[1]) 2026 2027 proc = self.Process(target=self._test, args=(values,)) 2028 proc.daemon = True 2029 proc.start() 2030 proc.join() 2031 2032 for sv, cv in zip(values, self.codes_values): 2033 self.assertEqual(sv.value, cv[2]) 2034 2035 def test_rawvalue(self): 2036 self.test_value(raw=True) 2037 2038 def test_getobj_getlock(self): 2039 val1 = self.Value('i', 5) 2040 lock1 = val1.get_lock() 2041 obj1 = val1.get_obj() 2042 2043 val2 = self.Value('i', 5, lock=None) 2044 lock2 = val2.get_lock() 2045 obj2 = val2.get_obj() 2046 2047 lock = self.Lock() 2048 val3 = self.Value('i', 5, lock=lock) 2049 lock3 = val3.get_lock() 2050 obj3 = val3.get_obj() 2051 self.assertEqual(lock, lock3) 2052 2053 arr4 = self.Value('i', 5, lock=False) 2054 self.assertFalse(hasattr(arr4, 'get_lock')) 2055 self.assertFalse(hasattr(arr4, 'get_obj')) 2056 2057 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2058 2059 arr5 = self.RawValue('i', 5) 2060 self.assertFalse(hasattr(arr5, 'get_lock')) 2061 self.assertFalse(hasattr(arr5, 'get_obj')) 2062 2063 2064class _TestArray(BaseTestCase): 2065 2066 ALLOWED_TYPES = ('processes',) 2067 2068 @classmethod 2069 def f(cls, seq): 2070 for i in range(1, len(seq)): 2071 seq[i] += seq[i-1] 2072 2073 @unittest.skipIf(c_int is None, "requires _ctypes") 2074 def test_array(self, raw=False): 2075 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2076 if raw: 2077 arr = self.RawArray('i', seq) 2078 else: 2079 arr = self.Array('i', seq) 2080 2081 self.assertEqual(len(arr), len(seq)) 2082 self.assertEqual(arr[3], seq[3]) 2083 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2084 2085 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2086 2087 self.assertEqual(list(arr[:]), seq) 2088 2089 self.f(seq) 2090 2091 p = self.Process(target=self.f, args=(arr,)) 2092 p.daemon = True 2093 p.start() 2094 p.join() 2095 2096 self.assertEqual(list(arr[:]), seq) 2097 2098 @unittest.skipIf(c_int is None, "requires _ctypes") 2099 def test_array_from_size(self): 2100 size = 10 2101 # Test for zeroing (see issue #11675). 2102 # The repetition below strengthens the test by increasing the chances 2103 # of previously allocated non-zero memory being used for the new array 2104 # on the 2nd and 3rd loops. 2105 for _ in range(3): 2106 arr = self.Array('i', size) 2107 self.assertEqual(len(arr), size) 2108 self.assertEqual(list(arr), [0] * size) 2109 arr[:] = range(10) 2110 self.assertEqual(list(arr), list(range(10))) 2111 del arr 2112 2113 @unittest.skipIf(c_int is None, "requires _ctypes") 2114 def test_rawarray(self): 2115 self.test_array(raw=True) 2116 2117 @unittest.skipIf(c_int is None, "requires _ctypes") 2118 def test_getobj_getlock_obj(self): 2119 arr1 = self.Array('i', list(range(10))) 2120 lock1 = arr1.get_lock() 2121 obj1 = arr1.get_obj() 2122 2123 arr2 = self.Array('i', list(range(10)), lock=None) 2124 lock2 = arr2.get_lock() 2125 obj2 = arr2.get_obj() 2126 2127 lock = self.Lock() 2128 arr3 = self.Array('i', list(range(10)), lock=lock) 2129 lock3 = arr3.get_lock() 2130 obj3 = arr3.get_obj() 2131 self.assertEqual(lock, lock3) 2132 2133 arr4 = self.Array('i', range(10), lock=False) 2134 self.assertFalse(hasattr(arr4, 'get_lock')) 2135 self.assertFalse(hasattr(arr4, 'get_obj')) 2136 self.assertRaises(AttributeError, 2137 self.Array, 'i', range(10), lock='notalock') 2138 2139 arr5 = self.RawArray('i', range(10)) 2140 self.assertFalse(hasattr(arr5, 'get_lock')) 2141 self.assertFalse(hasattr(arr5, 'get_obj')) 2142 2143# 2144# 2145# 2146 2147class _TestContainers(BaseTestCase): 2148 2149 ALLOWED_TYPES = ('manager',) 2150 2151 def test_list(self): 2152 a = self.list(list(range(10))) 2153 self.assertEqual(a[:], list(range(10))) 2154 2155 b = self.list() 2156 self.assertEqual(b[:], []) 2157 2158 b.extend(list(range(5))) 2159 self.assertEqual(b[:], list(range(5))) 2160 2161 self.assertEqual(b[2], 2) 2162 self.assertEqual(b[2:10], [2,3,4]) 2163 2164 b *= 2 2165 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2166 2167 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2168 2169 self.assertEqual(a[:], list(range(10))) 2170 2171 d = [a, b] 2172 e = self.list(d) 2173 self.assertEqual( 2174 [element[:] for element in e], 2175 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2176 ) 2177 2178 f = self.list([a]) 2179 a.append('hello') 2180 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2181 2182 def test_list_iter(self): 2183 a = self.list(list(range(10))) 2184 it = iter(a) 2185 self.assertEqual(list(it), list(range(10))) 2186 self.assertEqual(list(it), []) # exhausted 2187 # list modified during iteration 2188 it = iter(a) 2189 a[0] = 100 2190 self.assertEqual(next(it), 100) 2191 2192 def test_list_proxy_in_list(self): 2193 a = self.list([self.list(range(3)) for _i in range(3)]) 2194 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2195 2196 a[0][-1] = 55 2197 self.assertEqual(a[0][:], [0, 1, 55]) 2198 for i in range(1, 3): 2199 self.assertEqual(a[i][:], [0, 1, 2]) 2200 2201 self.assertEqual(a[1].pop(), 2) 2202 self.assertEqual(len(a[1]), 2) 2203 for i in range(0, 3, 2): 2204 self.assertEqual(len(a[i]), 3) 2205 2206 del a 2207 2208 b = self.list() 2209 b.append(b) 2210 del b 2211 2212 def test_dict(self): 2213 d = self.dict() 2214 indices = list(range(65, 70)) 2215 for i in indices: 2216 d[i] = chr(i) 2217 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2218 self.assertEqual(sorted(d.keys()), indices) 2219 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2220 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2221 2222 def test_dict_iter(self): 2223 d = self.dict() 2224 indices = list(range(65, 70)) 2225 for i in indices: 2226 d[i] = chr(i) 2227 it = iter(d) 2228 self.assertEqual(list(it), indices) 2229 self.assertEqual(list(it), []) # exhausted 2230 # dictionary changed size during iteration 2231 it = iter(d) 2232 d.clear() 2233 self.assertRaises(RuntimeError, next, it) 2234 2235 def test_dict_proxy_nested(self): 2236 pets = self.dict(ferrets=2, hamsters=4) 2237 supplies = self.dict(water=10, feed=3) 2238 d = self.dict(pets=pets, supplies=supplies) 2239 2240 self.assertEqual(supplies['water'], 10) 2241 self.assertEqual(d['supplies']['water'], 10) 2242 2243 d['supplies']['blankets'] = 5 2244 self.assertEqual(supplies['blankets'], 5) 2245 self.assertEqual(d['supplies']['blankets'], 5) 2246 2247 d['supplies']['water'] = 7 2248 self.assertEqual(supplies['water'], 7) 2249 self.assertEqual(d['supplies']['water'], 7) 2250 2251 del pets 2252 del supplies 2253 self.assertEqual(d['pets']['ferrets'], 2) 2254 d['supplies']['blankets'] = 11 2255 self.assertEqual(d['supplies']['blankets'], 11) 2256 2257 pets = d['pets'] 2258 supplies = d['supplies'] 2259 supplies['water'] = 7 2260 self.assertEqual(supplies['water'], 7) 2261 self.assertEqual(d['supplies']['water'], 7) 2262 2263 d.clear() 2264 self.assertEqual(len(d), 0) 2265 self.assertEqual(supplies['water'], 7) 2266 self.assertEqual(pets['hamsters'], 4) 2267 2268 l = self.list([pets, supplies]) 2269 l[0]['marmots'] = 1 2270 self.assertEqual(pets['marmots'], 1) 2271 self.assertEqual(l[0]['marmots'], 1) 2272 2273 del pets 2274 del supplies 2275 self.assertEqual(l[0]['marmots'], 1) 2276 2277 outer = self.list([[88, 99], l]) 2278 self.assertIsInstance(outer[0], list) # Not a ListProxy 2279 self.assertEqual(outer[-1][-1]['feed'], 3) 2280 2281 def test_namespace(self): 2282 n = self.Namespace() 2283 n.name = 'Bob' 2284 n.job = 'Builder' 2285 n._hidden = 'hidden' 2286 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2287 del n.job 2288 self.assertEqual(str(n), "Namespace(name='Bob')") 2289 self.assertTrue(hasattr(n, 'name')) 2290 self.assertTrue(not hasattr(n, 'job')) 2291 2292# 2293# 2294# 2295 2296def sqr(x, wait=0.0): 2297 time.sleep(wait) 2298 return x*x 2299 2300def mul(x, y): 2301 return x*y 2302 2303def raise_large_valuerror(wait): 2304 time.sleep(wait) 2305 raise ValueError("x" * 1024**2) 2306 2307def identity(x): 2308 return x 2309 2310class CountedObject(object): 2311 n_instances = 0 2312 2313 def __new__(cls): 2314 cls.n_instances += 1 2315 return object.__new__(cls) 2316 2317 def __del__(self): 2318 type(self).n_instances -= 1 2319 2320class SayWhenError(ValueError): pass 2321 2322def exception_throwing_generator(total, when): 2323 if when == -1: 2324 raise SayWhenError("Somebody said when") 2325 for i in range(total): 2326 if i == when: 2327 raise SayWhenError("Somebody said when") 2328 yield i 2329 2330 2331class _TestPool(BaseTestCase): 2332 2333 @classmethod 2334 def setUpClass(cls): 2335 super().setUpClass() 2336 cls.pool = cls.Pool(4) 2337 2338 @classmethod 2339 def tearDownClass(cls): 2340 cls.pool.terminate() 2341 cls.pool.join() 2342 cls.pool = None 2343 super().tearDownClass() 2344 2345 def test_apply(self): 2346 papply = self.pool.apply 2347 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2348 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2349 2350 def test_map(self): 2351 pmap = self.pool.map 2352 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2353 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2354 list(map(sqr, list(range(100))))) 2355 2356 def test_starmap(self): 2357 psmap = self.pool.starmap 2358 tuples = list(zip(range(10), range(9,-1, -1))) 2359 self.assertEqual(psmap(mul, tuples), 2360 list(itertools.starmap(mul, tuples))) 2361 tuples = list(zip(range(100), range(99,-1, -1))) 2362 self.assertEqual(psmap(mul, tuples, chunksize=20), 2363 list(itertools.starmap(mul, tuples))) 2364 2365 def test_starmap_async(self): 2366 tuples = list(zip(range(100), range(99,-1, -1))) 2367 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2368 list(itertools.starmap(mul, tuples))) 2369 2370 def test_map_async(self): 2371 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2372 list(map(sqr, list(range(10))))) 2373 2374 def test_map_async_callbacks(self): 2375 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2376 self.pool.map_async(int, ['1'], 2377 callback=call_args.append, 2378 error_callback=call_args.append).wait() 2379 self.assertEqual(1, len(call_args)) 2380 self.assertEqual([1], call_args[0]) 2381 self.pool.map_async(int, ['a'], 2382 callback=call_args.append, 2383 error_callback=call_args.append).wait() 2384 self.assertEqual(2, len(call_args)) 2385 self.assertIsInstance(call_args[1], ValueError) 2386 2387 def test_map_unplicklable(self): 2388 # Issue #19425 -- failure to pickle should not cause a hang 2389 if self.TYPE == 'threads': 2390 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2391 class A(object): 2392 def __reduce__(self): 2393 raise RuntimeError('cannot pickle') 2394 with self.assertRaises(RuntimeError): 2395 self.pool.map(sqr, [A()]*10) 2396 2397 def test_map_chunksize(self): 2398 try: 2399 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2400 except multiprocessing.TimeoutError: 2401 self.fail("pool.map_async with chunksize stalled on null list") 2402 2403 def test_map_handle_iterable_exception(self): 2404 if self.TYPE == 'manager': 2405 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2406 2407 # SayWhenError seen at the very first of the iterable 2408 with self.assertRaises(SayWhenError): 2409 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2410 # again, make sure it's reentrant 2411 with self.assertRaises(SayWhenError): 2412 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2413 2414 with self.assertRaises(SayWhenError): 2415 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2416 2417 class SpecialIterable: 2418 def __iter__(self): 2419 return self 2420 def __next__(self): 2421 raise SayWhenError 2422 def __len__(self): 2423 return 1 2424 with self.assertRaises(SayWhenError): 2425 self.pool.map(sqr, SpecialIterable(), 1) 2426 with self.assertRaises(SayWhenError): 2427 self.pool.map(sqr, SpecialIterable(), 1) 2428 2429 def test_async(self): 2430 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2431 get = TimingWrapper(res.get) 2432 self.assertEqual(get(), 49) 2433 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2434 2435 def test_async_timeout(self): 2436 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2437 get = TimingWrapper(res.get) 2438 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2439 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2440 2441 def test_imap(self): 2442 it = self.pool.imap(sqr, list(range(10))) 2443 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2444 2445 it = self.pool.imap(sqr, list(range(10))) 2446 for i in range(10): 2447 self.assertEqual(next(it), i*i) 2448 self.assertRaises(StopIteration, it.__next__) 2449 2450 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2451 for i in range(1000): 2452 self.assertEqual(next(it), i*i) 2453 self.assertRaises(StopIteration, it.__next__) 2454 2455 def test_imap_handle_iterable_exception(self): 2456 if self.TYPE == 'manager': 2457 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2458 2459 # SayWhenError seen at the very first of the iterable 2460 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2461 self.assertRaises(SayWhenError, it.__next__) 2462 # again, make sure it's reentrant 2463 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2464 self.assertRaises(SayWhenError, it.__next__) 2465 2466 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2467 for i in range(3): 2468 self.assertEqual(next(it), i*i) 2469 self.assertRaises(SayWhenError, it.__next__) 2470 2471 # SayWhenError seen at start of problematic chunk's results 2472 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2473 for i in range(6): 2474 self.assertEqual(next(it), i*i) 2475 self.assertRaises(SayWhenError, it.__next__) 2476 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2477 for i in range(4): 2478 self.assertEqual(next(it), i*i) 2479 self.assertRaises(SayWhenError, it.__next__) 2480 2481 def test_imap_unordered(self): 2482 it = self.pool.imap_unordered(sqr, list(range(10))) 2483 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2484 2485 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2486 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2487 2488 def test_imap_unordered_handle_iterable_exception(self): 2489 if self.TYPE == 'manager': 2490 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2491 2492 # SayWhenError seen at the very first of the iterable 2493 it = self.pool.imap_unordered(sqr, 2494 exception_throwing_generator(1, -1), 2495 1) 2496 self.assertRaises(SayWhenError, it.__next__) 2497 # again, make sure it's reentrant 2498 it = self.pool.imap_unordered(sqr, 2499 exception_throwing_generator(1, -1), 2500 1) 2501 self.assertRaises(SayWhenError, it.__next__) 2502 2503 it = self.pool.imap_unordered(sqr, 2504 exception_throwing_generator(10, 3), 2505 1) 2506 expected_values = list(map(sqr, list(range(10)))) 2507 with self.assertRaises(SayWhenError): 2508 # imap_unordered makes it difficult to anticipate the SayWhenError 2509 for i in range(10): 2510 value = next(it) 2511 self.assertIn(value, expected_values) 2512 expected_values.remove(value) 2513 2514 it = self.pool.imap_unordered(sqr, 2515 exception_throwing_generator(20, 7), 2516 2) 2517 expected_values = list(map(sqr, list(range(20)))) 2518 with self.assertRaises(SayWhenError): 2519 for i in range(20): 2520 value = next(it) 2521 self.assertIn(value, expected_values) 2522 expected_values.remove(value) 2523 2524 def test_make_pool(self): 2525 expected_error = (RemoteError if self.TYPE == 'manager' 2526 else ValueError) 2527 2528 self.assertRaises(expected_error, self.Pool, -1) 2529 self.assertRaises(expected_error, self.Pool, 0) 2530 2531 if self.TYPE != 'manager': 2532 p = self.Pool(3) 2533 try: 2534 self.assertEqual(3, len(p._pool)) 2535 finally: 2536 p.close() 2537 p.join() 2538 2539 def test_terminate(self): 2540 result = self.pool.map_async( 2541 time.sleep, [0.1 for i in range(10000)], chunksize=1 2542 ) 2543 self.pool.terminate() 2544 join = TimingWrapper(self.pool.join) 2545 join() 2546 # Sanity check the pool didn't wait for all tasks to finish 2547 self.assertLess(join.elapsed, 2.0) 2548 2549 def test_empty_iterable(self): 2550 # See Issue 12157 2551 p = self.Pool(1) 2552 2553 self.assertEqual(p.map(sqr, []), []) 2554 self.assertEqual(list(p.imap(sqr, [])), []) 2555 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2556 self.assertEqual(p.map_async(sqr, []).get(), []) 2557 2558 p.close() 2559 p.join() 2560 2561 def test_context(self): 2562 if self.TYPE == 'processes': 2563 L = list(range(10)) 2564 expected = [sqr(i) for i in L] 2565 with self.Pool(2) as p: 2566 r = p.map_async(sqr, L) 2567 self.assertEqual(r.get(), expected) 2568 p.join() 2569 self.assertRaises(ValueError, p.map_async, sqr, L) 2570 2571 @classmethod 2572 def _test_traceback(cls): 2573 raise RuntimeError(123) # some comment 2574 2575 def test_traceback(self): 2576 # We want ensure that the traceback from the child process is 2577 # contained in the traceback raised in the main process. 2578 if self.TYPE == 'processes': 2579 with self.Pool(1) as p: 2580 try: 2581 p.apply(self._test_traceback) 2582 except Exception as e: 2583 exc = e 2584 else: 2585 self.fail('expected RuntimeError') 2586 p.join() 2587 self.assertIs(type(exc), RuntimeError) 2588 self.assertEqual(exc.args, (123,)) 2589 cause = exc.__cause__ 2590 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2591 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2592 2593 with test.support.captured_stderr() as f1: 2594 try: 2595 raise exc 2596 except RuntimeError: 2597 sys.excepthook(*sys.exc_info()) 2598 self.assertIn('raise RuntimeError(123) # some comment', 2599 f1.getvalue()) 2600 # _helper_reraises_exception should not make the error 2601 # a remote exception 2602 with self.Pool(1) as p: 2603 try: 2604 p.map(sqr, exception_throwing_generator(1, -1), 1) 2605 except Exception as e: 2606 exc = e 2607 else: 2608 self.fail('expected SayWhenError') 2609 self.assertIs(type(exc), SayWhenError) 2610 self.assertIs(exc.__cause__, None) 2611 p.join() 2612 2613 @classmethod 2614 def _test_wrapped_exception(cls): 2615 raise RuntimeError('foo') 2616 2617 def test_wrapped_exception(self): 2618 # Issue #20980: Should not wrap exception when using thread pool 2619 with self.Pool(1) as p: 2620 with self.assertRaises(RuntimeError): 2621 p.apply(self._test_wrapped_exception) 2622 p.join() 2623 2624 def test_map_no_failfast(self): 2625 # Issue #23992: the fail-fast behaviour when an exception is raised 2626 # during map() would make Pool.join() deadlock, because a worker 2627 # process would fill the result queue (after the result handler thread 2628 # terminated, hence not draining it anymore). 2629 2630 t_start = time.monotonic() 2631 2632 with self.assertRaises(ValueError): 2633 with self.Pool(2) as p: 2634 try: 2635 p.map(raise_large_valuerror, [0, 1]) 2636 finally: 2637 time.sleep(0.5) 2638 p.close() 2639 p.join() 2640 2641 # check that we indeed waited for all jobs 2642 self.assertGreater(time.monotonic() - t_start, 0.9) 2643 2644 def test_release_task_refs(self): 2645 # Issue #29861: task arguments and results should not be kept 2646 # alive after we are done with them. 2647 objs = [CountedObject() for i in range(10)] 2648 refs = [weakref.ref(o) for o in objs] 2649 self.pool.map(identity, objs) 2650 2651 del objs 2652 time.sleep(DELTA) # let threaded cleanup code run 2653 self.assertEqual(set(wr() for wr in refs), {None}) 2654 # With a process pool, copies of the objects are returned, check 2655 # they were released too. 2656 self.assertEqual(CountedObject.n_instances, 0) 2657 2658 def test_enter(self): 2659 if self.TYPE == 'manager': 2660 self.skipTest("test not applicable to manager") 2661 2662 pool = self.Pool(1) 2663 with pool: 2664 pass 2665 # call pool.terminate() 2666 # pool is no longer running 2667 2668 with self.assertRaises(ValueError): 2669 # bpo-35477: pool.__enter__() fails if the pool is not running 2670 with pool: 2671 pass 2672 pool.join() 2673 2674 def test_resource_warning(self): 2675 if self.TYPE == 'manager': 2676 self.skipTest("test not applicable to manager") 2677 2678 pool = self.Pool(1) 2679 pool.terminate() 2680 pool.join() 2681 2682 # force state to RUN to emit ResourceWarning in __del__() 2683 pool._state = multiprocessing.pool.RUN 2684 2685 with support.check_warnings(('unclosed running multiprocessing pool', 2686 ResourceWarning)): 2687 pool = None 2688 support.gc_collect() 2689 2690def raising(): 2691 raise KeyError("key") 2692 2693def unpickleable_result(): 2694 return lambda: 42 2695 2696class _TestPoolWorkerErrors(BaseTestCase): 2697 ALLOWED_TYPES = ('processes', ) 2698 2699 def test_async_error_callback(self): 2700 p = multiprocessing.Pool(2) 2701 2702 scratchpad = [None] 2703 def errback(exc): 2704 scratchpad[0] = exc 2705 2706 res = p.apply_async(raising, error_callback=errback) 2707 self.assertRaises(KeyError, res.get) 2708 self.assertTrue(scratchpad[0]) 2709 self.assertIsInstance(scratchpad[0], KeyError) 2710 2711 p.close() 2712 p.join() 2713 2714 def test_unpickleable_result(self): 2715 from multiprocessing.pool import MaybeEncodingError 2716 p = multiprocessing.Pool(2) 2717 2718 # Make sure we don't lose pool processes because of encoding errors. 2719 for iteration in range(20): 2720 2721 scratchpad = [None] 2722 def errback(exc): 2723 scratchpad[0] = exc 2724 2725 res = p.apply_async(unpickleable_result, error_callback=errback) 2726 self.assertRaises(MaybeEncodingError, res.get) 2727 wrapped = scratchpad[0] 2728 self.assertTrue(wrapped) 2729 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2730 self.assertIsNotNone(wrapped.exc) 2731 self.assertIsNotNone(wrapped.value) 2732 2733 p.close() 2734 p.join() 2735 2736class _TestPoolWorkerLifetime(BaseTestCase): 2737 ALLOWED_TYPES = ('processes', ) 2738 2739 def test_pool_worker_lifetime(self): 2740 p = multiprocessing.Pool(3, maxtasksperchild=10) 2741 self.assertEqual(3, len(p._pool)) 2742 origworkerpids = [w.pid for w in p._pool] 2743 # Run many tasks so each worker gets replaced (hopefully) 2744 results = [] 2745 for i in range(100): 2746 results.append(p.apply_async(sqr, (i, ))) 2747 # Fetch the results and verify we got the right answers, 2748 # also ensuring all the tasks have completed. 2749 for (j, res) in enumerate(results): 2750 self.assertEqual(res.get(), sqr(j)) 2751 # Refill the pool 2752 p._repopulate_pool() 2753 # Wait until all workers are alive 2754 # (countdown * DELTA = 5 seconds max startup process time) 2755 countdown = 50 2756 while countdown and not all(w.is_alive() for w in p._pool): 2757 countdown -= 1 2758 time.sleep(DELTA) 2759 finalworkerpids = [w.pid for w in p._pool] 2760 # All pids should be assigned. See issue #7805. 2761 self.assertNotIn(None, origworkerpids) 2762 self.assertNotIn(None, finalworkerpids) 2763 # Finally, check that the worker pids have changed 2764 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2765 p.close() 2766 p.join() 2767 2768 def test_pool_worker_lifetime_early_close(self): 2769 # Issue #10332: closing a pool whose workers have limited lifetimes 2770 # before all the tasks completed would make join() hang. 2771 p = multiprocessing.Pool(3, maxtasksperchild=1) 2772 results = [] 2773 for i in range(6): 2774 results.append(p.apply_async(sqr, (i, 0.3))) 2775 p.close() 2776 p.join() 2777 # check the results 2778 for (j, res) in enumerate(results): 2779 self.assertEqual(res.get(), sqr(j)) 2780 2781# 2782# Test of creating a customized manager class 2783# 2784 2785from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2786 2787class FooBar(object): 2788 def f(self): 2789 return 'f()' 2790 def g(self): 2791 raise ValueError 2792 def _h(self): 2793 return '_h()' 2794 2795def baz(): 2796 for i in range(10): 2797 yield i*i 2798 2799class IteratorProxy(BaseProxy): 2800 _exposed_ = ('__next__',) 2801 def __iter__(self): 2802 return self 2803 def __next__(self): 2804 return self._callmethod('__next__') 2805 2806class MyManager(BaseManager): 2807 pass 2808 2809MyManager.register('Foo', callable=FooBar) 2810MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2811MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2812 2813 2814class _TestMyManager(BaseTestCase): 2815 2816 ALLOWED_TYPES = ('manager',) 2817 2818 def test_mymanager(self): 2819 manager = MyManager() 2820 manager.start() 2821 self.common(manager) 2822 manager.shutdown() 2823 2824 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2825 # to the manager process if it takes longer than 1 second to stop, 2826 # which happens on slow buildbots. 2827 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2828 2829 def test_mymanager_context(self): 2830 with MyManager() as manager: 2831 self.common(manager) 2832 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2833 # to the manager process if it takes longer than 1 second to stop, 2834 # which happens on slow buildbots. 2835 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2836 2837 def test_mymanager_context_prestarted(self): 2838 manager = MyManager() 2839 manager.start() 2840 with manager: 2841 self.common(manager) 2842 self.assertEqual(manager._process.exitcode, 0) 2843 2844 def common(self, manager): 2845 foo = manager.Foo() 2846 bar = manager.Bar() 2847 baz = manager.baz() 2848 2849 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2850 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2851 2852 self.assertEqual(foo_methods, ['f', 'g']) 2853 self.assertEqual(bar_methods, ['f', '_h']) 2854 2855 self.assertEqual(foo.f(), 'f()') 2856 self.assertRaises(ValueError, foo.g) 2857 self.assertEqual(foo._callmethod('f'), 'f()') 2858 self.assertRaises(RemoteError, foo._callmethod, '_h') 2859 2860 self.assertEqual(bar.f(), 'f()') 2861 self.assertEqual(bar._h(), '_h()') 2862 self.assertEqual(bar._callmethod('f'), 'f()') 2863 self.assertEqual(bar._callmethod('_h'), '_h()') 2864 2865 self.assertEqual(list(baz), [i*i for i in range(10)]) 2866 2867 2868# 2869# Test of connecting to a remote server and using xmlrpclib for serialization 2870# 2871 2872_queue = pyqueue.Queue() 2873def get_queue(): 2874 return _queue 2875 2876class QueueManager(BaseManager): 2877 '''manager class used by server process''' 2878QueueManager.register('get_queue', callable=get_queue) 2879 2880class QueueManager2(BaseManager): 2881 '''manager class which specifies the same interface as QueueManager''' 2882QueueManager2.register('get_queue') 2883 2884 2885SERIALIZER = 'xmlrpclib' 2886 2887class _TestRemoteManager(BaseTestCase): 2888 2889 ALLOWED_TYPES = ('manager',) 2890 values = ['hello world', None, True, 2.25, 2891 'hall\xe5 v\xe4rlden', 2892 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2893 b'hall\xe5 v\xe4rlden', 2894 ] 2895 result = values[:] 2896 2897 @classmethod 2898 def _putter(cls, address, authkey): 2899 manager = QueueManager2( 2900 address=address, authkey=authkey, serializer=SERIALIZER 2901 ) 2902 manager.connect() 2903 queue = manager.get_queue() 2904 # Note that xmlrpclib will deserialize object as a list not a tuple 2905 queue.put(tuple(cls.values)) 2906 2907 def test_remote(self): 2908 authkey = os.urandom(32) 2909 2910 manager = QueueManager( 2911 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 2912 ) 2913 manager.start() 2914 self.addCleanup(manager.shutdown) 2915 2916 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2917 p.daemon = True 2918 p.start() 2919 2920 manager2 = QueueManager2( 2921 address=manager.address, authkey=authkey, serializer=SERIALIZER 2922 ) 2923 manager2.connect() 2924 queue = manager2.get_queue() 2925 2926 self.assertEqual(queue.get(), self.result) 2927 2928 # Because we are using xmlrpclib for serialization instead of 2929 # pickle this will cause a serialization error. 2930 self.assertRaises(Exception, queue.put, time.sleep) 2931 2932 # Make queue finalizer run before the server is stopped 2933 del queue 2934 2935class _TestManagerRestart(BaseTestCase): 2936 2937 @classmethod 2938 def _putter(cls, address, authkey): 2939 manager = QueueManager( 2940 address=address, authkey=authkey, serializer=SERIALIZER) 2941 manager.connect() 2942 queue = manager.get_queue() 2943 queue.put('hello world') 2944 2945 def test_rapid_restart(self): 2946 authkey = os.urandom(32) 2947 manager = QueueManager( 2948 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2949 try: 2950 srvr = manager.get_server() 2951 addr = srvr.address 2952 # Close the connection.Listener socket which gets opened as a part 2953 # of manager.get_server(). It's not needed for the test. 2954 srvr.listener.close() 2955 manager.start() 2956 2957 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2958 p.start() 2959 p.join() 2960 queue = manager.get_queue() 2961 self.assertEqual(queue.get(), 'hello world') 2962 del queue 2963 finally: 2964 if hasattr(manager, "shutdown"): 2965 manager.shutdown() 2966 2967 manager = QueueManager( 2968 address=addr, authkey=authkey, serializer=SERIALIZER) 2969 try: 2970 manager.start() 2971 self.addCleanup(manager.shutdown) 2972 except OSError as e: 2973 if e.errno != errno.EADDRINUSE: 2974 raise 2975 # Retry after some time, in case the old socket was lingering 2976 # (sporadic failure on buildbots) 2977 time.sleep(1.0) 2978 manager = QueueManager( 2979 address=addr, authkey=authkey, serializer=SERIALIZER) 2980 if hasattr(manager, "shutdown"): 2981 self.addCleanup(manager.shutdown) 2982 2983# 2984# 2985# 2986 2987SENTINEL = latin('') 2988 2989class _TestConnection(BaseTestCase): 2990 2991 ALLOWED_TYPES = ('processes', 'threads') 2992 2993 @classmethod 2994 def _echo(cls, conn): 2995 for msg in iter(conn.recv_bytes, SENTINEL): 2996 conn.send_bytes(msg) 2997 conn.close() 2998 2999 def test_connection(self): 3000 conn, child_conn = self.Pipe() 3001 3002 p = self.Process(target=self._echo, args=(child_conn,)) 3003 p.daemon = True 3004 p.start() 3005 3006 seq = [1, 2.25, None] 3007 msg = latin('hello world') 3008 longmsg = msg * 10 3009 arr = array.array('i', list(range(4))) 3010 3011 if self.TYPE == 'processes': 3012 self.assertEqual(type(conn.fileno()), int) 3013 3014 self.assertEqual(conn.send(seq), None) 3015 self.assertEqual(conn.recv(), seq) 3016 3017 self.assertEqual(conn.send_bytes(msg), None) 3018 self.assertEqual(conn.recv_bytes(), msg) 3019 3020 if self.TYPE == 'processes': 3021 buffer = array.array('i', [0]*10) 3022 expected = list(arr) + [0] * (10 - len(arr)) 3023 self.assertEqual(conn.send_bytes(arr), None) 3024 self.assertEqual(conn.recv_bytes_into(buffer), 3025 len(arr) * buffer.itemsize) 3026 self.assertEqual(list(buffer), expected) 3027 3028 buffer = array.array('i', [0]*10) 3029 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3030 self.assertEqual(conn.send_bytes(arr), None) 3031 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3032 len(arr) * buffer.itemsize) 3033 self.assertEqual(list(buffer), expected) 3034 3035 buffer = bytearray(latin(' ' * 40)) 3036 self.assertEqual(conn.send_bytes(longmsg), None) 3037 try: 3038 res = conn.recv_bytes_into(buffer) 3039 except multiprocessing.BufferTooShort as e: 3040 self.assertEqual(e.args, (longmsg,)) 3041 else: 3042 self.fail('expected BufferTooShort, got %s' % res) 3043 3044 poll = TimingWrapper(conn.poll) 3045 3046 self.assertEqual(poll(), False) 3047 self.assertTimingAlmostEqual(poll.elapsed, 0) 3048 3049 self.assertEqual(poll(-1), False) 3050 self.assertTimingAlmostEqual(poll.elapsed, 0) 3051 3052 self.assertEqual(poll(TIMEOUT1), False) 3053 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3054 3055 conn.send(None) 3056 time.sleep(.1) 3057 3058 self.assertEqual(poll(TIMEOUT1), True) 3059 self.assertTimingAlmostEqual(poll.elapsed, 0) 3060 3061 self.assertEqual(conn.recv(), None) 3062 3063 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3064 conn.send_bytes(really_big_msg) 3065 self.assertEqual(conn.recv_bytes(), really_big_msg) 3066 3067 conn.send_bytes(SENTINEL) # tell child to quit 3068 child_conn.close() 3069 3070 if self.TYPE == 'processes': 3071 self.assertEqual(conn.readable, True) 3072 self.assertEqual(conn.writable, True) 3073 self.assertRaises(EOFError, conn.recv) 3074 self.assertRaises(EOFError, conn.recv_bytes) 3075 3076 p.join() 3077 3078 def test_duplex_false(self): 3079 reader, writer = self.Pipe(duplex=False) 3080 self.assertEqual(writer.send(1), None) 3081 self.assertEqual(reader.recv(), 1) 3082 if self.TYPE == 'processes': 3083 self.assertEqual(reader.readable, True) 3084 self.assertEqual(reader.writable, False) 3085 self.assertEqual(writer.readable, False) 3086 self.assertEqual(writer.writable, True) 3087 self.assertRaises(OSError, reader.send, 2) 3088 self.assertRaises(OSError, writer.recv) 3089 self.assertRaises(OSError, writer.poll) 3090 3091 def test_spawn_close(self): 3092 # We test that a pipe connection can be closed by parent 3093 # process immediately after child is spawned. On Windows this 3094 # would have sometimes failed on old versions because 3095 # child_conn would be closed before the child got a chance to 3096 # duplicate it. 3097 conn, child_conn = self.Pipe() 3098 3099 p = self.Process(target=self._echo, args=(child_conn,)) 3100 p.daemon = True 3101 p.start() 3102 child_conn.close() # this might complete before child initializes 3103 3104 msg = latin('hello') 3105 conn.send_bytes(msg) 3106 self.assertEqual(conn.recv_bytes(), msg) 3107 3108 conn.send_bytes(SENTINEL) 3109 conn.close() 3110 p.join() 3111 3112 def test_sendbytes(self): 3113 if self.TYPE != 'processes': 3114 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3115 3116 msg = latin('abcdefghijklmnopqrstuvwxyz') 3117 a, b = self.Pipe() 3118 3119 a.send_bytes(msg) 3120 self.assertEqual(b.recv_bytes(), msg) 3121 3122 a.send_bytes(msg, 5) 3123 self.assertEqual(b.recv_bytes(), msg[5:]) 3124 3125 a.send_bytes(msg, 7, 8) 3126 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3127 3128 a.send_bytes(msg, 26) 3129 self.assertEqual(b.recv_bytes(), latin('')) 3130 3131 a.send_bytes(msg, 26, 0) 3132 self.assertEqual(b.recv_bytes(), latin('')) 3133 3134 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3135 3136 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3137 3138 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3139 3140 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3141 3142 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3143 3144 @classmethod 3145 def _is_fd_assigned(cls, fd): 3146 try: 3147 os.fstat(fd) 3148 except OSError as e: 3149 if e.errno == errno.EBADF: 3150 return False 3151 raise 3152 else: 3153 return True 3154 3155 @classmethod 3156 def _writefd(cls, conn, data, create_dummy_fds=False): 3157 if create_dummy_fds: 3158 for i in range(0, 256): 3159 if not cls._is_fd_assigned(i): 3160 os.dup2(conn.fileno(), i) 3161 fd = reduction.recv_handle(conn) 3162 if msvcrt: 3163 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3164 os.write(fd, data) 3165 os.close(fd) 3166 3167 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3168 def test_fd_transfer(self): 3169 if self.TYPE != 'processes': 3170 self.skipTest("only makes sense with processes") 3171 conn, child_conn = self.Pipe(duplex=True) 3172 3173 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3174 p.daemon = True 3175 p.start() 3176 self.addCleanup(test.support.unlink, test.support.TESTFN) 3177 with open(test.support.TESTFN, "wb") as f: 3178 fd = f.fileno() 3179 if msvcrt: 3180 fd = msvcrt.get_osfhandle(fd) 3181 reduction.send_handle(conn, fd, p.pid) 3182 p.join() 3183 with open(test.support.TESTFN, "rb") as f: 3184 self.assertEqual(f.read(), b"foo") 3185 3186 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3187 @unittest.skipIf(sys.platform == "win32", 3188 "test semantics don't make sense on Windows") 3189 @unittest.skipIf(MAXFD <= 256, 3190 "largest assignable fd number is too small") 3191 @unittest.skipUnless(hasattr(os, "dup2"), 3192 "test needs os.dup2()") 3193 def test_large_fd_transfer(self): 3194 # With fd > 256 (issue #11657) 3195 if self.TYPE != 'processes': 3196 self.skipTest("only makes sense with processes") 3197 conn, child_conn = self.Pipe(duplex=True) 3198 3199 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3200 p.daemon = True 3201 p.start() 3202 self.addCleanup(test.support.unlink, test.support.TESTFN) 3203 with open(test.support.TESTFN, "wb") as f: 3204 fd = f.fileno() 3205 for newfd in range(256, MAXFD): 3206 if not self._is_fd_assigned(newfd): 3207 break 3208 else: 3209 self.fail("could not find an unassigned large file descriptor") 3210 os.dup2(fd, newfd) 3211 try: 3212 reduction.send_handle(conn, newfd, p.pid) 3213 finally: 3214 os.close(newfd) 3215 p.join() 3216 with open(test.support.TESTFN, "rb") as f: 3217 self.assertEqual(f.read(), b"bar") 3218 3219 @classmethod 3220 def _send_data_without_fd(self, conn): 3221 os.write(conn.fileno(), b"\0") 3222 3223 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3224 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3225 def test_missing_fd_transfer(self): 3226 # Check that exception is raised when received data is not 3227 # accompanied by a file descriptor in ancillary data. 3228 if self.TYPE != 'processes': 3229 self.skipTest("only makes sense with processes") 3230 conn, child_conn = self.Pipe(duplex=True) 3231 3232 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3233 p.daemon = True 3234 p.start() 3235 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3236 p.join() 3237 3238 def test_context(self): 3239 a, b = self.Pipe() 3240 3241 with a, b: 3242 a.send(1729) 3243 self.assertEqual(b.recv(), 1729) 3244 if self.TYPE == 'processes': 3245 self.assertFalse(a.closed) 3246 self.assertFalse(b.closed) 3247 3248 if self.TYPE == 'processes': 3249 self.assertTrue(a.closed) 3250 self.assertTrue(b.closed) 3251 self.assertRaises(OSError, a.recv) 3252 self.assertRaises(OSError, b.recv) 3253 3254class _TestListener(BaseTestCase): 3255 3256 ALLOWED_TYPES = ('processes',) 3257 3258 def test_multiple_bind(self): 3259 for family in self.connection.families: 3260 l = self.connection.Listener(family=family) 3261 self.addCleanup(l.close) 3262 self.assertRaises(OSError, self.connection.Listener, 3263 l.address, family) 3264 3265 def test_context(self): 3266 with self.connection.Listener() as l: 3267 with self.connection.Client(l.address) as c: 3268 with l.accept() as d: 3269 c.send(1729) 3270 self.assertEqual(d.recv(), 1729) 3271 3272 if self.TYPE == 'processes': 3273 self.assertRaises(OSError, l.accept) 3274 3275class _TestListenerClient(BaseTestCase): 3276 3277 ALLOWED_TYPES = ('processes', 'threads') 3278 3279 @classmethod 3280 def _test(cls, address): 3281 conn = cls.connection.Client(address) 3282 conn.send('hello') 3283 conn.close() 3284 3285 def test_listener_client(self): 3286 for family in self.connection.families: 3287 l = self.connection.Listener(family=family) 3288 p = self.Process(target=self._test, args=(l.address,)) 3289 p.daemon = True 3290 p.start() 3291 conn = l.accept() 3292 self.assertEqual(conn.recv(), 'hello') 3293 p.join() 3294 l.close() 3295 3296 def test_issue14725(self): 3297 l = self.connection.Listener() 3298 p = self.Process(target=self._test, args=(l.address,)) 3299 p.daemon = True 3300 p.start() 3301 time.sleep(1) 3302 # On Windows the client process should by now have connected, 3303 # written data and closed the pipe handle by now. This causes 3304 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3305 # 14725. 3306 conn = l.accept() 3307 self.assertEqual(conn.recv(), 'hello') 3308 conn.close() 3309 p.join() 3310 l.close() 3311 3312 def test_issue16955(self): 3313 for fam in self.connection.families: 3314 l = self.connection.Listener(family=fam) 3315 c = self.connection.Client(l.address) 3316 a = l.accept() 3317 a.send_bytes(b"hello") 3318 self.assertTrue(c.poll(1)) 3319 a.close() 3320 c.close() 3321 l.close() 3322 3323class _TestPoll(BaseTestCase): 3324 3325 ALLOWED_TYPES = ('processes', 'threads') 3326 3327 def test_empty_string(self): 3328 a, b = self.Pipe() 3329 self.assertEqual(a.poll(), False) 3330 b.send_bytes(b'') 3331 self.assertEqual(a.poll(), True) 3332 self.assertEqual(a.poll(), True) 3333 3334 @classmethod 3335 def _child_strings(cls, conn, strings): 3336 for s in strings: 3337 time.sleep(0.1) 3338 conn.send_bytes(s) 3339 conn.close() 3340 3341 def test_strings(self): 3342 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3343 a, b = self.Pipe() 3344 p = self.Process(target=self._child_strings, args=(b, strings)) 3345 p.start() 3346 3347 for s in strings: 3348 for i in range(200): 3349 if a.poll(0.01): 3350 break 3351 x = a.recv_bytes() 3352 self.assertEqual(s, x) 3353 3354 p.join() 3355 3356 @classmethod 3357 def _child_boundaries(cls, r): 3358 # Polling may "pull" a message in to the child process, but we 3359 # don't want it to pull only part of a message, as that would 3360 # corrupt the pipe for any other processes which might later 3361 # read from it. 3362 r.poll(5) 3363 3364 def test_boundaries(self): 3365 r, w = self.Pipe(False) 3366 p = self.Process(target=self._child_boundaries, args=(r,)) 3367 p.start() 3368 time.sleep(2) 3369 L = [b"first", b"second"] 3370 for obj in L: 3371 w.send_bytes(obj) 3372 w.close() 3373 p.join() 3374 self.assertIn(r.recv_bytes(), L) 3375 3376 @classmethod 3377 def _child_dont_merge(cls, b): 3378 b.send_bytes(b'a') 3379 b.send_bytes(b'b') 3380 b.send_bytes(b'cd') 3381 3382 def test_dont_merge(self): 3383 a, b = self.Pipe() 3384 self.assertEqual(a.poll(0.0), False) 3385 self.assertEqual(a.poll(0.1), False) 3386 3387 p = self.Process(target=self._child_dont_merge, args=(b,)) 3388 p.start() 3389 3390 self.assertEqual(a.recv_bytes(), b'a') 3391 self.assertEqual(a.poll(1.0), True) 3392 self.assertEqual(a.poll(1.0), True) 3393 self.assertEqual(a.recv_bytes(), b'b') 3394 self.assertEqual(a.poll(1.0), True) 3395 self.assertEqual(a.poll(1.0), True) 3396 self.assertEqual(a.poll(0.0), True) 3397 self.assertEqual(a.recv_bytes(), b'cd') 3398 3399 p.join() 3400 3401# 3402# Test of sending connection and socket objects between processes 3403# 3404 3405@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3406class _TestPicklingConnections(BaseTestCase): 3407 3408 ALLOWED_TYPES = ('processes',) 3409 3410 @classmethod 3411 def tearDownClass(cls): 3412 from multiprocessing import resource_sharer 3413 resource_sharer.stop(timeout=TIMEOUT) 3414 3415 @classmethod 3416 def _listener(cls, conn, families): 3417 for fam in families: 3418 l = cls.connection.Listener(family=fam) 3419 conn.send(l.address) 3420 new_conn = l.accept() 3421 conn.send(new_conn) 3422 new_conn.close() 3423 l.close() 3424 3425 l = socket.create_server((test.support.HOST, 0)) 3426 conn.send(l.getsockname()) 3427 new_conn, addr = l.accept() 3428 conn.send(new_conn) 3429 new_conn.close() 3430 l.close() 3431 3432 conn.recv() 3433 3434 @classmethod 3435 def _remote(cls, conn): 3436 for (address, msg) in iter(conn.recv, None): 3437 client = cls.connection.Client(address) 3438 client.send(msg.upper()) 3439 client.close() 3440 3441 address, msg = conn.recv() 3442 client = socket.socket() 3443 client.connect(address) 3444 client.sendall(msg.upper()) 3445 client.close() 3446 3447 conn.close() 3448 3449 def test_pickling(self): 3450 families = self.connection.families 3451 3452 lconn, lconn0 = self.Pipe() 3453 lp = self.Process(target=self._listener, args=(lconn0, families)) 3454 lp.daemon = True 3455 lp.start() 3456 lconn0.close() 3457 3458 rconn, rconn0 = self.Pipe() 3459 rp = self.Process(target=self._remote, args=(rconn0,)) 3460 rp.daemon = True 3461 rp.start() 3462 rconn0.close() 3463 3464 for fam in families: 3465 msg = ('This connection uses family %s' % fam).encode('ascii') 3466 address = lconn.recv() 3467 rconn.send((address, msg)) 3468 new_conn = lconn.recv() 3469 self.assertEqual(new_conn.recv(), msg.upper()) 3470 3471 rconn.send(None) 3472 3473 msg = latin('This connection uses a normal socket') 3474 address = lconn.recv() 3475 rconn.send((address, msg)) 3476 new_conn = lconn.recv() 3477 buf = [] 3478 while True: 3479 s = new_conn.recv(100) 3480 if not s: 3481 break 3482 buf.append(s) 3483 buf = b''.join(buf) 3484 self.assertEqual(buf, msg.upper()) 3485 new_conn.close() 3486 3487 lconn.send(None) 3488 3489 rconn.close() 3490 lconn.close() 3491 3492 lp.join() 3493 rp.join() 3494 3495 @classmethod 3496 def child_access(cls, conn): 3497 w = conn.recv() 3498 w.send('all is well') 3499 w.close() 3500 3501 r = conn.recv() 3502 msg = r.recv() 3503 conn.send(msg*2) 3504 3505 conn.close() 3506 3507 def test_access(self): 3508 # On Windows, if we do not specify a destination pid when 3509 # using DupHandle then we need to be careful to use the 3510 # correct access flags for DuplicateHandle(), or else 3511 # DupHandle.detach() will raise PermissionError. For example, 3512 # for a read only pipe handle we should use 3513 # access=FILE_GENERIC_READ. (Unfortunately 3514 # DUPLICATE_SAME_ACCESS does not work.) 3515 conn, child_conn = self.Pipe() 3516 p = self.Process(target=self.child_access, args=(child_conn,)) 3517 p.daemon = True 3518 p.start() 3519 child_conn.close() 3520 3521 r, w = self.Pipe(duplex=False) 3522 conn.send(w) 3523 w.close() 3524 self.assertEqual(r.recv(), 'all is well') 3525 r.close() 3526 3527 r, w = self.Pipe(duplex=False) 3528 conn.send(r) 3529 r.close() 3530 w.send('foobar') 3531 w.close() 3532 self.assertEqual(conn.recv(), 'foobar'*2) 3533 3534 p.join() 3535 3536# 3537# 3538# 3539 3540class _TestHeap(BaseTestCase): 3541 3542 ALLOWED_TYPES = ('processes',) 3543 3544 def setUp(self): 3545 super().setUp() 3546 # Make pristine heap for these tests 3547 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3548 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3549 3550 def tearDown(self): 3551 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3552 super().tearDown() 3553 3554 def test_heap(self): 3555 iterations = 5000 3556 maxblocks = 50 3557 blocks = [] 3558 3559 # get the heap object 3560 heap = multiprocessing.heap.BufferWrapper._heap 3561 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3562 3563 # create and destroy lots of blocks of different sizes 3564 for i in range(iterations): 3565 size = int(random.lognormvariate(0, 1) * 1000) 3566 b = multiprocessing.heap.BufferWrapper(size) 3567 blocks.append(b) 3568 if len(blocks) > maxblocks: 3569 i = random.randrange(maxblocks) 3570 del blocks[i] 3571 del b 3572 3573 # verify the state of the heap 3574 with heap._lock: 3575 all = [] 3576 free = 0 3577 occupied = 0 3578 for L in list(heap._len_to_seq.values()): 3579 # count all free blocks in arenas 3580 for arena, start, stop in L: 3581 all.append((heap._arenas.index(arena), start, stop, 3582 stop-start, 'free')) 3583 free += (stop-start) 3584 for arena, arena_blocks in heap._allocated_blocks.items(): 3585 # count all allocated blocks in arenas 3586 for start, stop in arena_blocks: 3587 all.append((heap._arenas.index(arena), start, stop, 3588 stop-start, 'occupied')) 3589 occupied += (stop-start) 3590 3591 self.assertEqual(free + occupied, 3592 sum(arena.size for arena in heap._arenas)) 3593 3594 all.sort() 3595 3596 for i in range(len(all)-1): 3597 (arena, start, stop) = all[i][:3] 3598 (narena, nstart, nstop) = all[i+1][:3] 3599 if arena != narena: 3600 # Two different arenas 3601 self.assertEqual(stop, heap._arenas[arena].size) # last block 3602 self.assertEqual(nstart, 0) # first block 3603 else: 3604 # Same arena: two adjacent blocks 3605 self.assertEqual(stop, nstart) 3606 3607 # test free'ing all blocks 3608 random.shuffle(blocks) 3609 while blocks: 3610 blocks.pop() 3611 3612 self.assertEqual(heap._n_frees, heap._n_mallocs) 3613 self.assertEqual(len(heap._pending_free_blocks), 0) 3614 self.assertEqual(len(heap._arenas), 0) 3615 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 3616 self.assertEqual(len(heap._len_to_seq), 0) 3617 3618 def test_free_from_gc(self): 3619 # Check that freeing of blocks by the garbage collector doesn't deadlock 3620 # (issue #12352). 3621 # Make sure the GC is enabled, and set lower collection thresholds to 3622 # make collections more frequent (and increase the probability of 3623 # deadlock). 3624 if not gc.isenabled(): 3625 gc.enable() 3626 self.addCleanup(gc.disable) 3627 thresholds = gc.get_threshold() 3628 self.addCleanup(gc.set_threshold, *thresholds) 3629 gc.set_threshold(10) 3630 3631 # perform numerous block allocations, with cyclic references to make 3632 # sure objects are collected asynchronously by the gc 3633 for i in range(5000): 3634 a = multiprocessing.heap.BufferWrapper(1) 3635 b = multiprocessing.heap.BufferWrapper(1) 3636 # circular references 3637 a.buddy = b 3638 b.buddy = a 3639 3640# 3641# 3642# 3643 3644class _Foo(Structure): 3645 _fields_ = [ 3646 ('x', c_int), 3647 ('y', c_double), 3648 ('z', c_longlong,) 3649 ] 3650 3651class _TestSharedCTypes(BaseTestCase): 3652 3653 ALLOWED_TYPES = ('processes',) 3654 3655 def setUp(self): 3656 if not HAS_SHAREDCTYPES: 3657 self.skipTest("requires multiprocessing.sharedctypes") 3658 3659 @classmethod 3660 def _double(cls, x, y, z, foo, arr, string): 3661 x.value *= 2 3662 y.value *= 2 3663 z.value *= 2 3664 foo.x *= 2 3665 foo.y *= 2 3666 string.value *= 2 3667 for i in range(len(arr)): 3668 arr[i] *= 2 3669 3670 def test_sharedctypes(self, lock=False): 3671 x = Value('i', 7, lock=lock) 3672 y = Value(c_double, 1.0/3.0, lock=lock) 3673 z = Value(c_longlong, 2 ** 33, lock=lock) 3674 foo = Value(_Foo, 3, 2, lock=lock) 3675 arr = self.Array('d', list(range(10)), lock=lock) 3676 string = self.Array('c', 20, lock=lock) 3677 string.value = latin('hello') 3678 3679 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3680 p.daemon = True 3681 p.start() 3682 p.join() 3683 3684 self.assertEqual(x.value, 14) 3685 self.assertAlmostEqual(y.value, 2.0/3.0) 3686 self.assertEqual(z.value, 2 ** 34) 3687 self.assertEqual(foo.x, 6) 3688 self.assertAlmostEqual(foo.y, 4.0) 3689 for i in range(10): 3690 self.assertAlmostEqual(arr[i], i*2) 3691 self.assertEqual(string.value, latin('hellohello')) 3692 3693 def test_synchronize(self): 3694 self.test_sharedctypes(lock=True) 3695 3696 def test_copy(self): 3697 foo = _Foo(2, 5.0, 2 ** 33) 3698 bar = copy(foo) 3699 foo.x = 0 3700 foo.y = 0 3701 foo.z = 0 3702 self.assertEqual(bar.x, 2) 3703 self.assertAlmostEqual(bar.y, 5.0) 3704 self.assertEqual(bar.z, 2 ** 33) 3705 3706 3707@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 3708class _TestSharedMemory(BaseTestCase): 3709 3710 ALLOWED_TYPES = ('processes',) 3711 3712 @staticmethod 3713 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 3714 if isinstance(shmem_name_or_obj, str): 3715 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 3716 else: 3717 local_sms = shmem_name_or_obj 3718 local_sms.buf[:len(binary_data)] = binary_data 3719 local_sms.close() 3720 3721 def test_shared_memory_basics(self): 3722 sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512) 3723 self.addCleanup(sms.unlink) 3724 3725 # Verify attributes are readable. 3726 self.assertEqual(sms.name, 'test01_tsmb') 3727 self.assertGreaterEqual(sms.size, 512) 3728 self.assertGreaterEqual(len(sms.buf), sms.size) 3729 3730 # Modify contents of shared memory segment through memoryview. 3731 sms.buf[0] = 42 3732 self.assertEqual(sms.buf[0], 42) 3733 3734 # Attach to existing shared memory segment. 3735 also_sms = shared_memory.SharedMemory('test01_tsmb') 3736 self.assertEqual(also_sms.buf[0], 42) 3737 also_sms.close() 3738 3739 # Attach to existing shared memory segment but specify a new size. 3740 same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size) 3741 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 3742 same_sms.close() 3743 3744 if shared_memory._USE_POSIX: 3745 # Posix Shared Memory can only be unlinked once. Here we 3746 # test an implementation detail that is not observed across 3747 # all supported platforms (since WindowsNamedSharedMemory 3748 # manages unlinking on its own and unlink() does nothing). 3749 # True release of shared memory segment does not necessarily 3750 # happen until process exits, depending on the OS platform. 3751 with self.assertRaises(FileNotFoundError): 3752 sms_uno = shared_memory.SharedMemory( 3753 'test01_dblunlink', 3754 create=True, 3755 size=5000 3756 ) 3757 3758 try: 3759 self.assertGreaterEqual(sms_uno.size, 5000) 3760 3761 sms_duo = shared_memory.SharedMemory('test01_dblunlink') 3762 sms_duo.unlink() # First shm_unlink() call. 3763 sms_duo.close() 3764 sms_uno.close() 3765 3766 finally: 3767 sms_uno.unlink() # A second shm_unlink() call is bad. 3768 3769 with self.assertRaises(FileExistsError): 3770 # Attempting to create a new shared memory segment with a 3771 # name that is already in use triggers an exception. 3772 there_can_only_be_one_sms = shared_memory.SharedMemory( 3773 'test01_tsmb', 3774 create=True, 3775 size=512 3776 ) 3777 3778 if shared_memory._USE_POSIX: 3779 # Requesting creation of a shared memory segment with the option 3780 # to attach to an existing segment, if that name is currently in 3781 # use, should not trigger an exception. 3782 # Note: Using a smaller size could possibly cause truncation of 3783 # the existing segment but is OS platform dependent. In the 3784 # case of MacOS/darwin, requesting a smaller size is disallowed. 3785 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 3786 _flags = os.O_CREAT | os.O_RDWR 3787 ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb') 3788 self.assertEqual(ok_if_exists_sms.size, sms.size) 3789 ok_if_exists_sms.close() 3790 3791 # Attempting to attach to an existing shared memory segment when 3792 # no segment exists with the supplied name triggers an exception. 3793 with self.assertRaises(FileNotFoundError): 3794 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 3795 nonexisting_sms.unlink() # Error should occur on prior line. 3796 3797 sms.close() 3798 3799 def test_shared_memory_across_processes(self): 3800 sms = shared_memory.SharedMemory('test02_tsmap', True, size=512) 3801 self.addCleanup(sms.unlink) 3802 3803 # Verify remote attachment to existing block by name is working. 3804 p = self.Process( 3805 target=self._attach_existing_shmem_then_write, 3806 args=(sms.name, b'howdy') 3807 ) 3808 p.daemon = True 3809 p.start() 3810 p.join() 3811 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 3812 3813 # Verify pickling of SharedMemory instance also works. 3814 p = self.Process( 3815 target=self._attach_existing_shmem_then_write, 3816 args=(sms, b'HELLO') 3817 ) 3818 p.daemon = True 3819 p.start() 3820 p.join() 3821 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 3822 3823 sms.close() 3824 3825 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 3826 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 3827 # bpo-36368: protect SharedMemoryManager server process from 3828 # KeyboardInterrupt signals. 3829 smm = multiprocessing.managers.SharedMemoryManager() 3830 smm.start() 3831 3832 # make sure the manager works properly at the beginning 3833 sl = smm.ShareableList(range(10)) 3834 3835 # the manager's server should ignore KeyboardInterrupt signals, and 3836 # maintain its connection with the current process, and success when 3837 # asked to deliver memory segments. 3838 os.kill(smm._process.pid, signal.SIGINT) 3839 3840 sl2 = smm.ShareableList(range(10)) 3841 3842 # test that the custom signal handler registered in the Manager does 3843 # not affect signal handling in the parent process. 3844 with self.assertRaises(KeyboardInterrupt): 3845 os.kill(os.getpid(), signal.SIGINT) 3846 3847 smm.shutdown() 3848 3849 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 3850 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 3851 # bpo-36867: test that a SharedMemoryManager uses the 3852 # same resource_tracker process as its parent. 3853 cmd = '''if 1: 3854 from multiprocessing.managers import SharedMemoryManager 3855 3856 3857 smm = SharedMemoryManager() 3858 smm.start() 3859 sl = smm.ShareableList(range(10)) 3860 smm.shutdown() 3861 ''' 3862 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 3863 3864 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 3865 # resource_tracker process as its parent would make the parent's 3866 # tracker complain about sl being leaked even though smm.shutdown() 3867 # properly released sl. 3868 self.assertFalse(err) 3869 3870 def test_shared_memory_SharedMemoryManager_basics(self): 3871 smm1 = multiprocessing.managers.SharedMemoryManager() 3872 with self.assertRaises(ValueError): 3873 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 3874 smm1.start() 3875 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 3876 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 3877 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 3878 self.assertEqual(len(doppleganger_list0), 5) 3879 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 3880 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 3881 held_name = lom[0].name 3882 smm1.shutdown() 3883 if sys.platform != "win32": 3884 # Calls to unlink() have no effect on Windows platform; shared 3885 # memory will only be released once final process exits. 3886 with self.assertRaises(FileNotFoundError): 3887 # No longer there to be attached to again. 3888 absent_shm = shared_memory.SharedMemory(name=held_name) 3889 3890 with multiprocessing.managers.SharedMemoryManager() as smm2: 3891 sl = smm2.ShareableList("howdy") 3892 shm = smm2.SharedMemory(size=128) 3893 held_name = sl.shm.name 3894 if sys.platform != "win32": 3895 with self.assertRaises(FileNotFoundError): 3896 # No longer there to be attached to again. 3897 absent_sl = shared_memory.ShareableList(name=held_name) 3898 3899 3900 def test_shared_memory_ShareableList_basics(self): 3901 sl = shared_memory.ShareableList( 3902 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 3903 ) 3904 self.addCleanup(sl.shm.unlink) 3905 3906 # Verify attributes are readable. 3907 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 3908 3909 # Exercise len(). 3910 self.assertEqual(len(sl), 7) 3911 3912 # Exercise index(). 3913 with warnings.catch_warnings(): 3914 # Suppress BytesWarning when comparing against b'HoWdY'. 3915 warnings.simplefilter('ignore') 3916 with self.assertRaises(ValueError): 3917 sl.index('100') 3918 self.assertEqual(sl.index(100), 3) 3919 3920 # Exercise retrieving individual values. 3921 self.assertEqual(sl[0], 'howdy') 3922 self.assertEqual(sl[-2], True) 3923 3924 # Exercise iterability. 3925 self.assertEqual( 3926 tuple(sl), 3927 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 3928 ) 3929 3930 # Exercise modifying individual values. 3931 sl[3] = 42 3932 self.assertEqual(sl[3], 42) 3933 sl[4] = 'some' # Change type at a given position. 3934 self.assertEqual(sl[4], 'some') 3935 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 3936 with self.assertRaises(ValueError): 3937 sl[4] = 'far too many' # Exceeds available storage. 3938 self.assertEqual(sl[4], 'some') 3939 3940 # Exercise count(). 3941 with warnings.catch_warnings(): 3942 # Suppress BytesWarning when comparing against b'HoWdY'. 3943 warnings.simplefilter('ignore') 3944 self.assertEqual(sl.count(42), 2) 3945 self.assertEqual(sl.count(b'HoWdY'), 1) 3946 self.assertEqual(sl.count(b'adios'), 0) 3947 3948 # Exercise creating a duplicate. 3949 sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate') 3950 try: 3951 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 3952 self.assertEqual('test03_duplicate', sl_copy.shm.name) 3953 self.assertEqual(list(sl), list(sl_copy)) 3954 self.assertEqual(sl.format, sl_copy.format) 3955 sl_copy[-1] = 77 3956 self.assertEqual(sl_copy[-1], 77) 3957 self.assertNotEqual(sl[-1], 77) 3958 sl_copy.shm.close() 3959 finally: 3960 sl_copy.shm.unlink() 3961 3962 # Obtain a second handle on the same ShareableList. 3963 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 3964 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 3965 sl_tethered[-1] = 880 3966 self.assertEqual(sl[-1], 880) 3967 sl_tethered.shm.close() 3968 3969 sl.shm.close() 3970 3971 # Exercise creating an empty ShareableList. 3972 empty_sl = shared_memory.ShareableList() 3973 try: 3974 self.assertEqual(len(empty_sl), 0) 3975 self.assertEqual(empty_sl.format, '') 3976 self.assertEqual(empty_sl.count('any'), 0) 3977 with self.assertRaises(ValueError): 3978 empty_sl.index(None) 3979 empty_sl.shm.close() 3980 finally: 3981 empty_sl.shm.unlink() 3982 3983 def test_shared_memory_ShareableList_pickling(self): 3984 sl = shared_memory.ShareableList(range(10)) 3985 self.addCleanup(sl.shm.unlink) 3986 3987 serialized_sl = pickle.dumps(sl) 3988 deserialized_sl = pickle.loads(serialized_sl) 3989 self.assertTrue( 3990 isinstance(deserialized_sl, shared_memory.ShareableList) 3991 ) 3992 self.assertTrue(deserialized_sl[-1], 9) 3993 self.assertFalse(sl is deserialized_sl) 3994 deserialized_sl[4] = "changed" 3995 self.assertEqual(sl[4], "changed") 3996 3997 # Verify data is not being put into the pickled representation. 3998 name = 'a' * len(sl.shm.name) 3999 larger_sl = shared_memory.ShareableList(range(400)) 4000 self.addCleanup(larger_sl.shm.unlink) 4001 serialized_larger_sl = pickle.dumps(larger_sl) 4002 self.assertTrue(len(serialized_sl) == len(serialized_larger_sl)) 4003 larger_sl.shm.close() 4004 4005 deserialized_sl.shm.close() 4006 sl.shm.close() 4007 4008 def test_shared_memory_cleaned_after_process_termination(self): 4009 cmd = '''if 1: 4010 import os, time, sys 4011 from multiprocessing import shared_memory 4012 4013 # Create a shared_memory segment, and send the segment name 4014 sm = shared_memory.SharedMemory(create=True, size=10) 4015 sys.stdout.write(sm.name + '\\n') 4016 sys.stdout.flush() 4017 time.sleep(100) 4018 ''' 4019 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4020 stdout=subprocess.PIPE, 4021 stderr=subprocess.PIPE) as p: 4022 name = p.stdout.readline().strip().decode() 4023 4024 # killing abruptly processes holding reference to a shared memory 4025 # segment should not leak the given memory segment. 4026 p.terminate() 4027 p.wait() 4028 4029 deadline = time.monotonic() + 60 4030 t = 0.1 4031 while time.monotonic() < deadline: 4032 time.sleep(t) 4033 t = min(t*2, 5) 4034 try: 4035 smm = shared_memory.SharedMemory(name, create=False) 4036 except FileNotFoundError: 4037 break 4038 else: 4039 raise AssertionError("A SharedMemory segment was leaked after" 4040 " a process was abruptly terminated.") 4041 4042 if os.name == 'posix': 4043 # A warning was emitted by the subprocess' own 4044 # resource_tracker (on Windows, shared memory segments 4045 # are released automatically by the OS). 4046 err = p.stderr.read().decode() 4047 self.assertIn( 4048 "resource_tracker: There appear to be 1 leaked " 4049 "shared_memory objects to clean up at shutdown", err) 4050 4051# 4052# 4053# 4054 4055class _TestFinalize(BaseTestCase): 4056 4057 ALLOWED_TYPES = ('processes',) 4058 4059 def setUp(self): 4060 self.registry_backup = util._finalizer_registry.copy() 4061 util._finalizer_registry.clear() 4062 4063 def tearDown(self): 4064 self.assertFalse(util._finalizer_registry) 4065 util._finalizer_registry.update(self.registry_backup) 4066 4067 @classmethod 4068 def _test_finalize(cls, conn): 4069 class Foo(object): 4070 pass 4071 4072 a = Foo() 4073 util.Finalize(a, conn.send, args=('a',)) 4074 del a # triggers callback for a 4075 4076 b = Foo() 4077 close_b = util.Finalize(b, conn.send, args=('b',)) 4078 close_b() # triggers callback for b 4079 close_b() # does nothing because callback has already been called 4080 del b # does nothing because callback has already been called 4081 4082 c = Foo() 4083 util.Finalize(c, conn.send, args=('c',)) 4084 4085 d10 = Foo() 4086 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4087 4088 d01 = Foo() 4089 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4090 d02 = Foo() 4091 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4092 d03 = Foo() 4093 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4094 4095 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4096 4097 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4098 4099 # call multiprocessing's cleanup function then exit process without 4100 # garbage collecting locals 4101 util._exit_function() 4102 conn.close() 4103 os._exit(0) 4104 4105 def test_finalize(self): 4106 conn, child_conn = self.Pipe() 4107 4108 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4109 p.daemon = True 4110 p.start() 4111 p.join() 4112 4113 result = [obj for obj in iter(conn.recv, 'STOP')] 4114 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4115 4116 def test_thread_safety(self): 4117 # bpo-24484: _run_finalizers() should be thread-safe 4118 def cb(): 4119 pass 4120 4121 class Foo(object): 4122 def __init__(self): 4123 self.ref = self # create reference cycle 4124 # insert finalizer at random key 4125 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4126 4127 finish = False 4128 exc = None 4129 4130 def run_finalizers(): 4131 nonlocal exc 4132 while not finish: 4133 time.sleep(random.random() * 1e-1) 4134 try: 4135 # A GC run will eventually happen during this, 4136 # collecting stale Foo's and mutating the registry 4137 util._run_finalizers() 4138 except Exception as e: 4139 exc = e 4140 4141 def make_finalizers(): 4142 nonlocal exc 4143 d = {} 4144 while not finish: 4145 try: 4146 # Old Foo's get gradually replaced and later 4147 # collected by the GC (because of the cyclic ref) 4148 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4149 except Exception as e: 4150 exc = e 4151 d.clear() 4152 4153 old_interval = sys.getswitchinterval() 4154 old_threshold = gc.get_threshold() 4155 try: 4156 sys.setswitchinterval(1e-6) 4157 gc.set_threshold(5, 5, 5) 4158 threads = [threading.Thread(target=run_finalizers), 4159 threading.Thread(target=make_finalizers)] 4160 with test.support.start_threads(threads): 4161 time.sleep(4.0) # Wait a bit to trigger race condition 4162 finish = True 4163 if exc is not None: 4164 raise exc 4165 finally: 4166 sys.setswitchinterval(old_interval) 4167 gc.set_threshold(*old_threshold) 4168 gc.collect() # Collect remaining Foo's 4169 4170 4171# 4172# Test that from ... import * works for each module 4173# 4174 4175class _TestImportStar(unittest.TestCase): 4176 4177 def get_module_names(self): 4178 import glob 4179 folder = os.path.dirname(multiprocessing.__file__) 4180 pattern = os.path.join(folder, '*.py') 4181 files = glob.glob(pattern) 4182 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4183 modules = ['multiprocessing.' + m for m in modules] 4184 modules.remove('multiprocessing.__init__') 4185 modules.append('multiprocessing') 4186 return modules 4187 4188 def test_import(self): 4189 modules = self.get_module_names() 4190 if sys.platform == 'win32': 4191 modules.remove('multiprocessing.popen_fork') 4192 modules.remove('multiprocessing.popen_forkserver') 4193 modules.remove('multiprocessing.popen_spawn_posix') 4194 else: 4195 modules.remove('multiprocessing.popen_spawn_win32') 4196 if not HAS_REDUCTION: 4197 modules.remove('multiprocessing.popen_forkserver') 4198 4199 if c_int is None: 4200 # This module requires _ctypes 4201 modules.remove('multiprocessing.sharedctypes') 4202 4203 for name in modules: 4204 __import__(name) 4205 mod = sys.modules[name] 4206 self.assertTrue(hasattr(mod, '__all__'), name) 4207 4208 for attr in mod.__all__: 4209 self.assertTrue( 4210 hasattr(mod, attr), 4211 '%r does not have attribute %r' % (mod, attr) 4212 ) 4213 4214# 4215# Quick test that logging works -- does not test logging output 4216# 4217 4218class _TestLogging(BaseTestCase): 4219 4220 ALLOWED_TYPES = ('processes',) 4221 4222 def test_enable_logging(self): 4223 logger = multiprocessing.get_logger() 4224 logger.setLevel(util.SUBWARNING) 4225 self.assertTrue(logger is not None) 4226 logger.debug('this will not be printed') 4227 logger.info('nor will this') 4228 logger.setLevel(LOG_LEVEL) 4229 4230 @classmethod 4231 def _test_level(cls, conn): 4232 logger = multiprocessing.get_logger() 4233 conn.send(logger.getEffectiveLevel()) 4234 4235 def test_level(self): 4236 LEVEL1 = 32 4237 LEVEL2 = 37 4238 4239 logger = multiprocessing.get_logger() 4240 root_logger = logging.getLogger() 4241 root_level = root_logger.level 4242 4243 reader, writer = multiprocessing.Pipe(duplex=False) 4244 4245 logger.setLevel(LEVEL1) 4246 p = self.Process(target=self._test_level, args=(writer,)) 4247 p.start() 4248 self.assertEqual(LEVEL1, reader.recv()) 4249 p.join() 4250 p.close() 4251 4252 logger.setLevel(logging.NOTSET) 4253 root_logger.setLevel(LEVEL2) 4254 p = self.Process(target=self._test_level, args=(writer,)) 4255 p.start() 4256 self.assertEqual(LEVEL2, reader.recv()) 4257 p.join() 4258 p.close() 4259 4260 root_logger.setLevel(root_level) 4261 logger.setLevel(level=LOG_LEVEL) 4262 4263 4264# class _TestLoggingProcessName(BaseTestCase): 4265# 4266# def handle(self, record): 4267# assert record.processName == multiprocessing.current_process().name 4268# self.__handled = True 4269# 4270# def test_logging(self): 4271# handler = logging.Handler() 4272# handler.handle = self.handle 4273# self.__handled = False 4274# # Bypass getLogger() and side-effects 4275# logger = logging.getLoggerClass()( 4276# 'multiprocessing.test.TestLoggingProcessName') 4277# logger.addHandler(handler) 4278# logger.propagate = False 4279# 4280# logger.warn('foo') 4281# assert self.__handled 4282 4283# 4284# Check that Process.join() retries if os.waitpid() fails with EINTR 4285# 4286 4287class _TestPollEintr(BaseTestCase): 4288 4289 ALLOWED_TYPES = ('processes',) 4290 4291 @classmethod 4292 def _killer(cls, pid): 4293 time.sleep(0.1) 4294 os.kill(pid, signal.SIGUSR1) 4295 4296 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4297 def test_poll_eintr(self): 4298 got_signal = [False] 4299 def record(*args): 4300 got_signal[0] = True 4301 pid = os.getpid() 4302 oldhandler = signal.signal(signal.SIGUSR1, record) 4303 try: 4304 killer = self.Process(target=self._killer, args=(pid,)) 4305 killer.start() 4306 try: 4307 p = self.Process(target=time.sleep, args=(2,)) 4308 p.start() 4309 p.join() 4310 finally: 4311 killer.join() 4312 self.assertTrue(got_signal[0]) 4313 self.assertEqual(p.exitcode, 0) 4314 finally: 4315 signal.signal(signal.SIGUSR1, oldhandler) 4316 4317# 4318# Test to verify handle verification, see issue 3321 4319# 4320 4321class TestInvalidHandle(unittest.TestCase): 4322 4323 @unittest.skipIf(WIN32, "skipped on Windows") 4324 def test_invalid_handles(self): 4325 conn = multiprocessing.connection.Connection(44977608) 4326 # check that poll() doesn't crash 4327 try: 4328 conn.poll() 4329 except (ValueError, OSError): 4330 pass 4331 finally: 4332 # Hack private attribute _handle to avoid printing an error 4333 # in conn.__del__ 4334 conn._handle = None 4335 self.assertRaises((ValueError, OSError), 4336 multiprocessing.connection.Connection, -1) 4337 4338 4339 4340class OtherTest(unittest.TestCase): 4341 # TODO: add more tests for deliver/answer challenge. 4342 def test_deliver_challenge_auth_failure(self): 4343 class _FakeConnection(object): 4344 def recv_bytes(self, size): 4345 return b'something bogus' 4346 def send_bytes(self, data): 4347 pass 4348 self.assertRaises(multiprocessing.AuthenticationError, 4349 multiprocessing.connection.deliver_challenge, 4350 _FakeConnection(), b'abc') 4351 4352 def test_answer_challenge_auth_failure(self): 4353 class _FakeConnection(object): 4354 def __init__(self): 4355 self.count = 0 4356 def recv_bytes(self, size): 4357 self.count += 1 4358 if self.count == 1: 4359 return multiprocessing.connection.CHALLENGE 4360 elif self.count == 2: 4361 return b'something bogus' 4362 return b'' 4363 def send_bytes(self, data): 4364 pass 4365 self.assertRaises(multiprocessing.AuthenticationError, 4366 multiprocessing.connection.answer_challenge, 4367 _FakeConnection(), b'abc') 4368 4369# 4370# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 4371# 4372 4373def initializer(ns): 4374 ns.test += 1 4375 4376class TestInitializers(unittest.TestCase): 4377 def setUp(self): 4378 self.mgr = multiprocessing.Manager() 4379 self.ns = self.mgr.Namespace() 4380 self.ns.test = 0 4381 4382 def tearDown(self): 4383 self.mgr.shutdown() 4384 self.mgr.join() 4385 4386 def test_manager_initializer(self): 4387 m = multiprocessing.managers.SyncManager() 4388 self.assertRaises(TypeError, m.start, 1) 4389 m.start(initializer, (self.ns,)) 4390 self.assertEqual(self.ns.test, 1) 4391 m.shutdown() 4392 m.join() 4393 4394 def test_pool_initializer(self): 4395 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 4396 p = multiprocessing.Pool(1, initializer, (self.ns,)) 4397 p.close() 4398 p.join() 4399 self.assertEqual(self.ns.test, 1) 4400 4401# 4402# Issue 5155, 5313, 5331: Test process in processes 4403# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 4404# 4405 4406def _this_sub_process(q): 4407 try: 4408 item = q.get(block=False) 4409 except pyqueue.Empty: 4410 pass 4411 4412def _test_process(): 4413 queue = multiprocessing.Queue() 4414 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 4415 subProc.daemon = True 4416 subProc.start() 4417 subProc.join() 4418 4419def _afunc(x): 4420 return x*x 4421 4422def pool_in_process(): 4423 pool = multiprocessing.Pool(processes=4) 4424 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 4425 pool.close() 4426 pool.join() 4427 4428class _file_like(object): 4429 def __init__(self, delegate): 4430 self._delegate = delegate 4431 self._pid = None 4432 4433 @property 4434 def cache(self): 4435 pid = os.getpid() 4436 # There are no race conditions since fork keeps only the running thread 4437 if pid != self._pid: 4438 self._pid = pid 4439 self._cache = [] 4440 return self._cache 4441 4442 def write(self, data): 4443 self.cache.append(data) 4444 4445 def flush(self): 4446 self._delegate.write(''.join(self.cache)) 4447 self._cache = [] 4448 4449class TestStdinBadfiledescriptor(unittest.TestCase): 4450 4451 def test_queue_in_process(self): 4452 proc = multiprocessing.Process(target=_test_process) 4453 proc.start() 4454 proc.join() 4455 4456 def test_pool_in_process(self): 4457 p = multiprocessing.Process(target=pool_in_process) 4458 p.start() 4459 p.join() 4460 4461 def test_flushing(self): 4462 sio = io.StringIO() 4463 flike = _file_like(sio) 4464 flike.write('foo') 4465 proc = multiprocessing.Process(target=lambda: flike.flush()) 4466 flike.flush() 4467 assert sio.getvalue() == 'foo' 4468 4469 4470class TestWait(unittest.TestCase): 4471 4472 @classmethod 4473 def _child_test_wait(cls, w, slow): 4474 for i in range(10): 4475 if slow: 4476 time.sleep(random.random()*0.1) 4477 w.send((i, os.getpid())) 4478 w.close() 4479 4480 def test_wait(self, slow=False): 4481 from multiprocessing.connection import wait 4482 readers = [] 4483 procs = [] 4484 messages = [] 4485 4486 for i in range(4): 4487 r, w = multiprocessing.Pipe(duplex=False) 4488 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 4489 p.daemon = True 4490 p.start() 4491 w.close() 4492 readers.append(r) 4493 procs.append(p) 4494 self.addCleanup(p.join) 4495 4496 while readers: 4497 for r in wait(readers): 4498 try: 4499 msg = r.recv() 4500 except EOFError: 4501 readers.remove(r) 4502 r.close() 4503 else: 4504 messages.append(msg) 4505 4506 messages.sort() 4507 expected = sorted((i, p.pid) for i in range(10) for p in procs) 4508 self.assertEqual(messages, expected) 4509 4510 @classmethod 4511 def _child_test_wait_socket(cls, address, slow): 4512 s = socket.socket() 4513 s.connect(address) 4514 for i in range(10): 4515 if slow: 4516 time.sleep(random.random()*0.1) 4517 s.sendall(('%s\n' % i).encode('ascii')) 4518 s.close() 4519 4520 def test_wait_socket(self, slow=False): 4521 from multiprocessing.connection import wait 4522 l = socket.create_server((test.support.HOST, 0)) 4523 addr = l.getsockname() 4524 readers = [] 4525 procs = [] 4526 dic = {} 4527 4528 for i in range(4): 4529 p = multiprocessing.Process(target=self._child_test_wait_socket, 4530 args=(addr, slow)) 4531 p.daemon = True 4532 p.start() 4533 procs.append(p) 4534 self.addCleanup(p.join) 4535 4536 for i in range(4): 4537 r, _ = l.accept() 4538 readers.append(r) 4539 dic[r] = [] 4540 l.close() 4541 4542 while readers: 4543 for r in wait(readers): 4544 msg = r.recv(32) 4545 if not msg: 4546 readers.remove(r) 4547 r.close() 4548 else: 4549 dic[r].append(msg) 4550 4551 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4552 for v in dic.values(): 4553 self.assertEqual(b''.join(v), expected) 4554 4555 def test_wait_slow(self): 4556 self.test_wait(True) 4557 4558 def test_wait_socket_slow(self): 4559 self.test_wait_socket(True) 4560 4561 def test_wait_timeout(self): 4562 from multiprocessing.connection import wait 4563 4564 expected = 5 4565 a, b = multiprocessing.Pipe() 4566 4567 start = time.monotonic() 4568 res = wait([a, b], expected) 4569 delta = time.monotonic() - start 4570 4571 self.assertEqual(res, []) 4572 self.assertLess(delta, expected * 2) 4573 self.assertGreater(delta, expected * 0.5) 4574 4575 b.send(None) 4576 4577 start = time.monotonic() 4578 res = wait([a, b], 20) 4579 delta = time.monotonic() - start 4580 4581 self.assertEqual(res, [a]) 4582 self.assertLess(delta, 0.4) 4583 4584 @classmethod 4585 def signal_and_sleep(cls, sem, period): 4586 sem.release() 4587 time.sleep(period) 4588 4589 def test_wait_integer(self): 4590 from multiprocessing.connection import wait 4591 4592 expected = 3 4593 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4594 sem = multiprocessing.Semaphore(0) 4595 a, b = multiprocessing.Pipe() 4596 p = multiprocessing.Process(target=self.signal_and_sleep, 4597 args=(sem, expected)) 4598 4599 p.start() 4600 self.assertIsInstance(p.sentinel, int) 4601 self.assertTrue(sem.acquire(timeout=20)) 4602 4603 start = time.monotonic() 4604 res = wait([a, p.sentinel, b], expected + 20) 4605 delta = time.monotonic() - start 4606 4607 self.assertEqual(res, [p.sentinel]) 4608 self.assertLess(delta, expected + 2) 4609 self.assertGreater(delta, expected - 2) 4610 4611 a.send(None) 4612 4613 start = time.monotonic() 4614 res = wait([a, p.sentinel, b], 20) 4615 delta = time.monotonic() - start 4616 4617 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4618 self.assertLess(delta, 0.4) 4619 4620 b.send(None) 4621 4622 start = time.monotonic() 4623 res = wait([a, p.sentinel, b], 20) 4624 delta = time.monotonic() - start 4625 4626 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4627 self.assertLess(delta, 0.4) 4628 4629 p.terminate() 4630 p.join() 4631 4632 def test_neg_timeout(self): 4633 from multiprocessing.connection import wait 4634 a, b = multiprocessing.Pipe() 4635 t = time.monotonic() 4636 res = wait([a], timeout=-1) 4637 t = time.monotonic() - t 4638 self.assertEqual(res, []) 4639 self.assertLess(t, 1) 4640 a.close() 4641 b.close() 4642 4643# 4644# Issue 14151: Test invalid family on invalid environment 4645# 4646 4647class TestInvalidFamily(unittest.TestCase): 4648 4649 @unittest.skipIf(WIN32, "skipped on Windows") 4650 def test_invalid_family(self): 4651 with self.assertRaises(ValueError): 4652 multiprocessing.connection.Listener(r'\\.\test') 4653 4654 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4655 def test_invalid_family_win32(self): 4656 with self.assertRaises(ValueError): 4657 multiprocessing.connection.Listener('/var/test.pipe') 4658 4659# 4660# Issue 12098: check sys.flags of child matches that for parent 4661# 4662 4663class TestFlags(unittest.TestCase): 4664 @classmethod 4665 def run_in_grandchild(cls, conn): 4666 conn.send(tuple(sys.flags)) 4667 4668 @classmethod 4669 def run_in_child(cls): 4670 import json 4671 r, w = multiprocessing.Pipe(duplex=False) 4672 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4673 p.start() 4674 grandchild_flags = r.recv() 4675 p.join() 4676 r.close() 4677 w.close() 4678 flags = (tuple(sys.flags), grandchild_flags) 4679 print(json.dumps(flags)) 4680 4681 def test_flags(self): 4682 import json 4683 # start child process using unusual flags 4684 prog = ('from test._test_multiprocessing import TestFlags; ' + 4685 'TestFlags.run_in_child()') 4686 data = subprocess.check_output( 4687 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4688 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4689 self.assertEqual(child_flags, grandchild_flags) 4690 4691# 4692# Test interaction with socket timeouts - see Issue #6056 4693# 4694 4695class TestTimeouts(unittest.TestCase): 4696 @classmethod 4697 def _test_timeout(cls, child, address): 4698 time.sleep(1) 4699 child.send(123) 4700 child.close() 4701 conn = multiprocessing.connection.Client(address) 4702 conn.send(456) 4703 conn.close() 4704 4705 def test_timeout(self): 4706 old_timeout = socket.getdefaulttimeout() 4707 try: 4708 socket.setdefaulttimeout(0.1) 4709 parent, child = multiprocessing.Pipe(duplex=True) 4710 l = multiprocessing.connection.Listener(family='AF_INET') 4711 p = multiprocessing.Process(target=self._test_timeout, 4712 args=(child, l.address)) 4713 p.start() 4714 child.close() 4715 self.assertEqual(parent.recv(), 123) 4716 parent.close() 4717 conn = l.accept() 4718 self.assertEqual(conn.recv(), 456) 4719 conn.close() 4720 l.close() 4721 join_process(p) 4722 finally: 4723 socket.setdefaulttimeout(old_timeout) 4724 4725# 4726# Test what happens with no "if __name__ == '__main__'" 4727# 4728 4729class TestNoForkBomb(unittest.TestCase): 4730 def test_noforkbomb(self): 4731 sm = multiprocessing.get_start_method() 4732 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 4733 if sm != 'fork': 4734 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 4735 self.assertEqual(out, b'') 4736 self.assertIn(b'RuntimeError', err) 4737 else: 4738 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 4739 self.assertEqual(out.rstrip(), b'123') 4740 self.assertEqual(err, b'') 4741 4742# 4743# Issue #17555: ForkAwareThreadLock 4744# 4745 4746class TestForkAwareThreadLock(unittest.TestCase): 4747 # We recursively start processes. Issue #17555 meant that the 4748 # after fork registry would get duplicate entries for the same 4749 # lock. The size of the registry at generation n was ~2**n. 4750 4751 @classmethod 4752 def child(cls, n, conn): 4753 if n > 1: 4754 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 4755 p.start() 4756 conn.close() 4757 join_process(p) 4758 else: 4759 conn.send(len(util._afterfork_registry)) 4760 conn.close() 4761 4762 def test_lock(self): 4763 r, w = multiprocessing.Pipe(False) 4764 l = util.ForkAwareThreadLock() 4765 old_size = len(util._afterfork_registry) 4766 p = multiprocessing.Process(target=self.child, args=(5, w)) 4767 p.start() 4768 w.close() 4769 new_size = r.recv() 4770 join_process(p) 4771 self.assertLessEqual(new_size, old_size) 4772 4773# 4774# Check that non-forked child processes do not inherit unneeded fds/handles 4775# 4776 4777class TestCloseFds(unittest.TestCase): 4778 4779 def get_high_socket_fd(self): 4780 if WIN32: 4781 # The child process will not have any socket handles, so 4782 # calling socket.fromfd() should produce WSAENOTSOCK even 4783 # if there is a handle of the same number. 4784 return socket.socket().detach() 4785 else: 4786 # We want to produce a socket with an fd high enough that a 4787 # freshly created child process will not have any fds as high. 4788 fd = socket.socket().detach() 4789 to_close = [] 4790 while fd < 50: 4791 to_close.append(fd) 4792 fd = os.dup(fd) 4793 for x in to_close: 4794 os.close(x) 4795 return fd 4796 4797 def close(self, fd): 4798 if WIN32: 4799 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 4800 else: 4801 os.close(fd) 4802 4803 @classmethod 4804 def _test_closefds(cls, conn, fd): 4805 try: 4806 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 4807 except Exception as e: 4808 conn.send(e) 4809 else: 4810 s.close() 4811 conn.send(None) 4812 4813 def test_closefd(self): 4814 if not HAS_REDUCTION: 4815 raise unittest.SkipTest('requires fd pickling') 4816 4817 reader, writer = multiprocessing.Pipe() 4818 fd = self.get_high_socket_fd() 4819 try: 4820 p = multiprocessing.Process(target=self._test_closefds, 4821 args=(writer, fd)) 4822 p.start() 4823 writer.close() 4824 e = reader.recv() 4825 join_process(p) 4826 finally: 4827 self.close(fd) 4828 writer.close() 4829 reader.close() 4830 4831 if multiprocessing.get_start_method() == 'fork': 4832 self.assertIs(e, None) 4833 else: 4834 WSAENOTSOCK = 10038 4835 self.assertIsInstance(e, OSError) 4836 self.assertTrue(e.errno == errno.EBADF or 4837 e.winerror == WSAENOTSOCK, e) 4838 4839# 4840# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 4841# 4842 4843class TestIgnoreEINTR(unittest.TestCase): 4844 4845 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 4846 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 4847 4848 @classmethod 4849 def _test_ignore(cls, conn): 4850 def handler(signum, frame): 4851 pass 4852 signal.signal(signal.SIGUSR1, handler) 4853 conn.send('ready') 4854 x = conn.recv() 4855 conn.send(x) 4856 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 4857 4858 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4859 def test_ignore(self): 4860 conn, child_conn = multiprocessing.Pipe() 4861 try: 4862 p = multiprocessing.Process(target=self._test_ignore, 4863 args=(child_conn,)) 4864 p.daemon = True 4865 p.start() 4866 child_conn.close() 4867 self.assertEqual(conn.recv(), 'ready') 4868 time.sleep(0.1) 4869 os.kill(p.pid, signal.SIGUSR1) 4870 time.sleep(0.1) 4871 conn.send(1234) 4872 self.assertEqual(conn.recv(), 1234) 4873 time.sleep(0.1) 4874 os.kill(p.pid, signal.SIGUSR1) 4875 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 4876 time.sleep(0.1) 4877 p.join() 4878 finally: 4879 conn.close() 4880 4881 @classmethod 4882 def _test_ignore_listener(cls, conn): 4883 def handler(signum, frame): 4884 pass 4885 signal.signal(signal.SIGUSR1, handler) 4886 with multiprocessing.connection.Listener() as l: 4887 conn.send(l.address) 4888 a = l.accept() 4889 a.send('welcome') 4890 4891 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4892 def test_ignore_listener(self): 4893 conn, child_conn = multiprocessing.Pipe() 4894 try: 4895 p = multiprocessing.Process(target=self._test_ignore_listener, 4896 args=(child_conn,)) 4897 p.daemon = True 4898 p.start() 4899 child_conn.close() 4900 address = conn.recv() 4901 time.sleep(0.1) 4902 os.kill(p.pid, signal.SIGUSR1) 4903 time.sleep(0.1) 4904 client = multiprocessing.connection.Client(address) 4905 self.assertEqual(client.recv(), 'welcome') 4906 p.join() 4907 finally: 4908 conn.close() 4909 4910class TestStartMethod(unittest.TestCase): 4911 @classmethod 4912 def _check_context(cls, conn): 4913 conn.send(multiprocessing.get_start_method()) 4914 4915 def check_context(self, ctx): 4916 r, w = ctx.Pipe(duplex=False) 4917 p = ctx.Process(target=self._check_context, args=(w,)) 4918 p.start() 4919 w.close() 4920 child_method = r.recv() 4921 r.close() 4922 p.join() 4923 self.assertEqual(child_method, ctx.get_start_method()) 4924 4925 def test_context(self): 4926 for method in ('fork', 'spawn', 'forkserver'): 4927 try: 4928 ctx = multiprocessing.get_context(method) 4929 except ValueError: 4930 continue 4931 self.assertEqual(ctx.get_start_method(), method) 4932 self.assertIs(ctx.get_context(), ctx) 4933 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 4934 self.assertRaises(ValueError, ctx.set_start_method, None) 4935 self.check_context(ctx) 4936 4937 def test_set_get(self): 4938 multiprocessing.set_forkserver_preload(PRELOAD) 4939 count = 0 4940 old_method = multiprocessing.get_start_method() 4941 try: 4942 for method in ('fork', 'spawn', 'forkserver'): 4943 try: 4944 multiprocessing.set_start_method(method, force=True) 4945 except ValueError: 4946 continue 4947 self.assertEqual(multiprocessing.get_start_method(), method) 4948 ctx = multiprocessing.get_context() 4949 self.assertEqual(ctx.get_start_method(), method) 4950 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 4951 self.assertTrue( 4952 ctx.Process.__name__.lower().startswith(method)) 4953 self.check_context(multiprocessing) 4954 count += 1 4955 finally: 4956 multiprocessing.set_start_method(old_method, force=True) 4957 self.assertGreaterEqual(count, 1) 4958 4959 def test_get_all(self): 4960 methods = multiprocessing.get_all_start_methods() 4961 if sys.platform == 'win32': 4962 self.assertEqual(methods, ['spawn']) 4963 else: 4964 self.assertTrue(methods == ['fork', 'spawn'] or 4965 methods == ['fork', 'spawn', 'forkserver']) 4966 4967 def test_preload_resources(self): 4968 if multiprocessing.get_start_method() != 'forkserver': 4969 self.skipTest("test only relevant for 'forkserver' method") 4970 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 4971 rc, out, err = test.support.script_helper.assert_python_ok(name) 4972 out = out.decode() 4973 err = err.decode() 4974 if out.rstrip() != 'ok' or err != '': 4975 print(out) 4976 print(err) 4977 self.fail("failed spawning forkserver or grandchild") 4978 4979 4980@unittest.skipIf(sys.platform == "win32", 4981 "test semantics don't make sense on Windows") 4982class TestResourceTracker(unittest.TestCase): 4983 4984 def test_resource_tracker(self): 4985 # 4986 # Check that killing process does not leak named semaphores 4987 # 4988 cmd = '''if 1: 4989 import time, os, tempfile 4990 import multiprocessing as mp 4991 from multiprocessing import resource_tracker 4992 from multiprocessing.shared_memory import SharedMemory 4993 4994 mp.set_start_method("spawn") 4995 rand = tempfile._RandomNameSequence() 4996 4997 4998 def create_and_register_resource(rtype): 4999 if rtype == "semaphore": 5000 lock = mp.Lock() 5001 return lock, lock._semlock.name 5002 elif rtype == "shared_memory": 5003 sm = SharedMemory(create=True, size=10) 5004 return sm, sm._name 5005 else: 5006 raise ValueError( 5007 "Resource type {{}} not understood".format(rtype)) 5008 5009 5010 resource1, rname1 = create_and_register_resource("{rtype}") 5011 resource2, rname2 = create_and_register_resource("{rtype}") 5012 5013 os.write({w}, rname1.encode("ascii") + b"\\n") 5014 os.write({w}, rname2.encode("ascii") + b"\\n") 5015 5016 time.sleep(10) 5017 ''' 5018 for rtype in resource_tracker._CLEANUP_FUNCS: 5019 with self.subTest(rtype=rtype): 5020 if rtype == "noop": 5021 # Artefact resource type used by the resource_tracker 5022 continue 5023 r, w = os.pipe() 5024 p = subprocess.Popen([sys.executable, 5025 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5026 pass_fds=[w], 5027 stderr=subprocess.PIPE) 5028 os.close(w) 5029 with open(r, 'rb', closefd=True) as f: 5030 name1 = f.readline().rstrip().decode('ascii') 5031 name2 = f.readline().rstrip().decode('ascii') 5032 _resource_unlink(name1, rtype) 5033 p.terminate() 5034 p.wait() 5035 5036 deadline = time.monotonic() + 60 5037 while time.monotonic() < deadline: 5038 time.sleep(.5) 5039 try: 5040 _resource_unlink(name2, rtype) 5041 except OSError as e: 5042 # docs say it should be ENOENT, but OSX seems to give 5043 # EINVAL 5044 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5045 break 5046 else: 5047 raise AssertionError( 5048 f"A {rtype} resource was leaked after a process was " 5049 f"abruptly terminated.") 5050 err = p.stderr.read().decode('utf-8') 5051 p.stderr.close() 5052 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5053 'objects'.format( 5054 rtype)) 5055 self.assertRegex(err, expected) 5056 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5057 5058 def check_resource_tracker_death(self, signum, should_die): 5059 # bpo-31310: if the semaphore tracker process has died, it should 5060 # be restarted implicitly. 5061 from multiprocessing.resource_tracker import _resource_tracker 5062 pid = _resource_tracker._pid 5063 if pid is not None: 5064 os.kill(pid, signal.SIGKILL) 5065 os.waitpid(pid, 0) 5066 with warnings.catch_warnings(): 5067 warnings.simplefilter("ignore") 5068 _resource_tracker.ensure_running() 5069 pid = _resource_tracker._pid 5070 5071 os.kill(pid, signum) 5072 time.sleep(1.0) # give it time to die 5073 5074 ctx = multiprocessing.get_context("spawn") 5075 with warnings.catch_warnings(record=True) as all_warn: 5076 warnings.simplefilter("always") 5077 sem = ctx.Semaphore() 5078 sem.acquire() 5079 sem.release() 5080 wr = weakref.ref(sem) 5081 # ensure `sem` gets collected, which triggers communication with 5082 # the semaphore tracker 5083 del sem 5084 gc.collect() 5085 self.assertIsNone(wr()) 5086 if should_die: 5087 self.assertEqual(len(all_warn), 1) 5088 the_warn = all_warn[0] 5089 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5090 self.assertTrue("resource_tracker: process died" 5091 in str(the_warn.message)) 5092 else: 5093 self.assertEqual(len(all_warn), 0) 5094 5095 def test_resource_tracker_sigint(self): 5096 # Catchable signal (ignored by semaphore tracker) 5097 self.check_resource_tracker_death(signal.SIGINT, False) 5098 5099 def test_resource_tracker_sigterm(self): 5100 # Catchable signal (ignored by semaphore tracker) 5101 self.check_resource_tracker_death(signal.SIGTERM, False) 5102 5103 def test_resource_tracker_sigkill(self): 5104 # Uncatchable signal. 5105 self.check_resource_tracker_death(signal.SIGKILL, True) 5106 5107 @staticmethod 5108 def _is_resource_tracker_reused(conn, pid): 5109 from multiprocessing.resource_tracker import _resource_tracker 5110 _resource_tracker.ensure_running() 5111 # The pid should be None in the child process, expect for the fork 5112 # context. It should not be a new value. 5113 reused = _resource_tracker._pid in (None, pid) 5114 reused &= _resource_tracker._check_alive() 5115 conn.send(reused) 5116 5117 def test_resource_tracker_reused(self): 5118 from multiprocessing.resource_tracker import _resource_tracker 5119 _resource_tracker.ensure_running() 5120 pid = _resource_tracker._pid 5121 5122 r, w = multiprocessing.Pipe(duplex=False) 5123 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5124 args=(w, pid)) 5125 p.start() 5126 is_resource_tracker_reused = r.recv() 5127 5128 # Clean up 5129 p.join() 5130 w.close() 5131 r.close() 5132 5133 self.assertTrue(is_resource_tracker_reused) 5134 5135 5136class TestSimpleQueue(unittest.TestCase): 5137 5138 @classmethod 5139 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5140 child_can_start.wait() 5141 # issue 30301, could fail under spawn and forkserver 5142 try: 5143 queue.put(queue.empty()) 5144 queue.put(queue.empty()) 5145 finally: 5146 parent_can_continue.set() 5147 5148 def test_empty(self): 5149 queue = multiprocessing.SimpleQueue() 5150 child_can_start = multiprocessing.Event() 5151 parent_can_continue = multiprocessing.Event() 5152 5153 proc = multiprocessing.Process( 5154 target=self._test_empty, 5155 args=(queue, child_can_start, parent_can_continue) 5156 ) 5157 proc.daemon = True 5158 proc.start() 5159 5160 self.assertTrue(queue.empty()) 5161 5162 child_can_start.set() 5163 parent_can_continue.wait() 5164 5165 self.assertFalse(queue.empty()) 5166 self.assertEqual(queue.get(), True) 5167 self.assertEqual(queue.get(), False) 5168 self.assertTrue(queue.empty()) 5169 5170 proc.join() 5171 5172 5173class TestPoolNotLeakOnFailure(unittest.TestCase): 5174 5175 def test_release_unused_processes(self): 5176 # Issue #19675: During pool creation, if we can't create a process, 5177 # don't leak already created ones. 5178 will_fail_in = 3 5179 forked_processes = [] 5180 5181 class FailingForkProcess: 5182 def __init__(self, **kwargs): 5183 self.name = 'Fake Process' 5184 self.exitcode = None 5185 self.state = None 5186 forked_processes.append(self) 5187 5188 def start(self): 5189 nonlocal will_fail_in 5190 if will_fail_in <= 0: 5191 raise OSError("Manually induced OSError") 5192 will_fail_in -= 1 5193 self.state = 'started' 5194 5195 def terminate(self): 5196 self.state = 'stopping' 5197 5198 def join(self): 5199 if self.state == 'stopping': 5200 self.state = 'stopped' 5201 5202 def is_alive(self): 5203 return self.state == 'started' or self.state == 'stopping' 5204 5205 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 5206 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 5207 Process=FailingForkProcess)) 5208 p.close() 5209 p.join() 5210 self.assertFalse( 5211 any(process.is_alive() for process in forked_processes)) 5212 5213 5214class TestSyncManagerTypes(unittest.TestCase): 5215 """Test all the types which can be shared between a parent and a 5216 child process by using a manager which acts as an intermediary 5217 between them. 5218 5219 In the following unit-tests the base type is created in the parent 5220 process, the @classmethod represents the worker process and the 5221 shared object is readable and editable between the two. 5222 5223 # The child. 5224 @classmethod 5225 def _test_list(cls, obj): 5226 assert obj[0] == 5 5227 assert obj.append(6) 5228 5229 # The parent. 5230 def test_list(self): 5231 o = self.manager.list() 5232 o.append(5) 5233 self.run_worker(self._test_list, o) 5234 assert o[1] == 6 5235 """ 5236 manager_class = multiprocessing.managers.SyncManager 5237 5238 def setUp(self): 5239 self.manager = self.manager_class() 5240 self.manager.start() 5241 self.proc = None 5242 5243 def tearDown(self): 5244 if self.proc is not None and self.proc.is_alive(): 5245 self.proc.terminate() 5246 self.proc.join() 5247 self.manager.shutdown() 5248 self.manager = None 5249 self.proc = None 5250 5251 @classmethod 5252 def setUpClass(cls): 5253 support.reap_children() 5254 5255 tearDownClass = setUpClass 5256 5257 def wait_proc_exit(self): 5258 # Only the manager process should be returned by active_children() 5259 # but this can take a bit on slow machines, so wait a few seconds 5260 # if there are other children too (see #17395). 5261 join_process(self.proc) 5262 start_time = time.monotonic() 5263 t = 0.01 5264 while len(multiprocessing.active_children()) > 1: 5265 time.sleep(t) 5266 t *= 2 5267 dt = time.monotonic() - start_time 5268 if dt >= 5.0: 5269 test.support.environment_altered = True 5270 print("Warning -- multiprocessing.Manager still has %s active " 5271 "children after %s seconds" 5272 % (multiprocessing.active_children(), dt), 5273 file=sys.stderr) 5274 break 5275 5276 def run_worker(self, worker, obj): 5277 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 5278 self.proc.daemon = True 5279 self.proc.start() 5280 self.wait_proc_exit() 5281 self.assertEqual(self.proc.exitcode, 0) 5282 5283 @classmethod 5284 def _test_event(cls, obj): 5285 assert obj.is_set() 5286 obj.wait() 5287 obj.clear() 5288 obj.wait(0.001) 5289 5290 def test_event(self): 5291 o = self.manager.Event() 5292 o.set() 5293 self.run_worker(self._test_event, o) 5294 assert not o.is_set() 5295 o.wait(0.001) 5296 5297 @classmethod 5298 def _test_lock(cls, obj): 5299 obj.acquire() 5300 5301 def test_lock(self, lname="Lock"): 5302 o = getattr(self.manager, lname)() 5303 self.run_worker(self._test_lock, o) 5304 o.release() 5305 self.assertRaises(RuntimeError, o.release) # already released 5306 5307 @classmethod 5308 def _test_rlock(cls, obj): 5309 obj.acquire() 5310 obj.release() 5311 5312 def test_rlock(self, lname="Lock"): 5313 o = getattr(self.manager, lname)() 5314 self.run_worker(self._test_rlock, o) 5315 5316 @classmethod 5317 def _test_semaphore(cls, obj): 5318 obj.acquire() 5319 5320 def test_semaphore(self, sname="Semaphore"): 5321 o = getattr(self.manager, sname)() 5322 self.run_worker(self._test_semaphore, o) 5323 o.release() 5324 5325 def test_bounded_semaphore(self): 5326 self.test_semaphore(sname="BoundedSemaphore") 5327 5328 @classmethod 5329 def _test_condition(cls, obj): 5330 obj.acquire() 5331 obj.release() 5332 5333 def test_condition(self): 5334 o = self.manager.Condition() 5335 self.run_worker(self._test_condition, o) 5336 5337 @classmethod 5338 def _test_barrier(cls, obj): 5339 assert obj.parties == 5 5340 obj.reset() 5341 5342 def test_barrier(self): 5343 o = self.manager.Barrier(5) 5344 self.run_worker(self._test_barrier, o) 5345 5346 @classmethod 5347 def _test_pool(cls, obj): 5348 # TODO: fix https://bugs.python.org/issue35919 5349 with obj: 5350 pass 5351 5352 def test_pool(self): 5353 o = self.manager.Pool(processes=4) 5354 self.run_worker(self._test_pool, o) 5355 5356 @classmethod 5357 def _test_queue(cls, obj): 5358 assert obj.qsize() == 2 5359 assert obj.full() 5360 assert not obj.empty() 5361 assert obj.get() == 5 5362 assert not obj.empty() 5363 assert obj.get() == 6 5364 assert obj.empty() 5365 5366 def test_queue(self, qname="Queue"): 5367 o = getattr(self.manager, qname)(2) 5368 o.put(5) 5369 o.put(6) 5370 self.run_worker(self._test_queue, o) 5371 assert o.empty() 5372 assert not o.full() 5373 5374 def test_joinable_queue(self): 5375 self.test_queue("JoinableQueue") 5376 5377 @classmethod 5378 def _test_list(cls, obj): 5379 assert obj[0] == 5 5380 assert obj.count(5) == 1 5381 assert obj.index(5) == 0 5382 obj.sort() 5383 obj.reverse() 5384 for x in obj: 5385 pass 5386 assert len(obj) == 1 5387 assert obj.pop(0) == 5 5388 5389 def test_list(self): 5390 o = self.manager.list() 5391 o.append(5) 5392 self.run_worker(self._test_list, o) 5393 assert not o 5394 self.assertEqual(len(o), 0) 5395 5396 @classmethod 5397 def _test_dict(cls, obj): 5398 assert len(obj) == 1 5399 assert obj['foo'] == 5 5400 assert obj.get('foo') == 5 5401 assert list(obj.items()) == [('foo', 5)] 5402 assert list(obj.keys()) == ['foo'] 5403 assert list(obj.values()) == [5] 5404 assert obj.copy() == {'foo': 5} 5405 assert obj.popitem() == ('foo', 5) 5406 5407 def test_dict(self): 5408 o = self.manager.dict() 5409 o['foo'] = 5 5410 self.run_worker(self._test_dict, o) 5411 assert not o 5412 self.assertEqual(len(o), 0) 5413 5414 @classmethod 5415 def _test_value(cls, obj): 5416 assert obj.value == 1 5417 assert obj.get() == 1 5418 obj.set(2) 5419 5420 def test_value(self): 5421 o = self.manager.Value('i', 1) 5422 self.run_worker(self._test_value, o) 5423 self.assertEqual(o.value, 2) 5424 self.assertEqual(o.get(), 2) 5425 5426 @classmethod 5427 def _test_array(cls, obj): 5428 assert obj[0] == 0 5429 assert obj[1] == 1 5430 assert len(obj) == 2 5431 assert list(obj) == [0, 1] 5432 5433 def test_array(self): 5434 o = self.manager.Array('i', [0, 1]) 5435 self.run_worker(self._test_array, o) 5436 5437 @classmethod 5438 def _test_namespace(cls, obj): 5439 assert obj.x == 0 5440 assert obj.y == 1 5441 5442 def test_namespace(self): 5443 o = self.manager.Namespace() 5444 o.x = 0 5445 o.y = 1 5446 self.run_worker(self._test_namespace, o) 5447 5448 5449class MiscTestCase(unittest.TestCase): 5450 def test__all__(self): 5451 # Just make sure names in blacklist are excluded 5452 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 5453 blacklist=['SUBDEBUG', 'SUBWARNING']) 5454# 5455# Mixins 5456# 5457 5458class BaseMixin(object): 5459 @classmethod 5460 def setUpClass(cls): 5461 cls.dangling = (multiprocessing.process._dangling.copy(), 5462 threading._dangling.copy()) 5463 5464 @classmethod 5465 def tearDownClass(cls): 5466 # bpo-26762: Some multiprocessing objects like Pool create reference 5467 # cycles. Trigger a garbage collection to break these cycles. 5468 test.support.gc_collect() 5469 5470 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 5471 if processes: 5472 test.support.environment_altered = True 5473 print('Warning -- Dangling processes: %s' % processes, 5474 file=sys.stderr) 5475 processes = None 5476 5477 threads = set(threading._dangling) - set(cls.dangling[1]) 5478 if threads: 5479 test.support.environment_altered = True 5480 print('Warning -- Dangling threads: %s' % threads, 5481 file=sys.stderr) 5482 threads = None 5483 5484 5485class ProcessesMixin(BaseMixin): 5486 TYPE = 'processes' 5487 Process = multiprocessing.Process 5488 connection = multiprocessing.connection 5489 current_process = staticmethod(multiprocessing.current_process) 5490 parent_process = staticmethod(multiprocessing.parent_process) 5491 active_children = staticmethod(multiprocessing.active_children) 5492 Pool = staticmethod(multiprocessing.Pool) 5493 Pipe = staticmethod(multiprocessing.Pipe) 5494 Queue = staticmethod(multiprocessing.Queue) 5495 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 5496 Lock = staticmethod(multiprocessing.Lock) 5497 RLock = staticmethod(multiprocessing.RLock) 5498 Semaphore = staticmethod(multiprocessing.Semaphore) 5499 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 5500 Condition = staticmethod(multiprocessing.Condition) 5501 Event = staticmethod(multiprocessing.Event) 5502 Barrier = staticmethod(multiprocessing.Barrier) 5503 Value = staticmethod(multiprocessing.Value) 5504 Array = staticmethod(multiprocessing.Array) 5505 RawValue = staticmethod(multiprocessing.RawValue) 5506 RawArray = staticmethod(multiprocessing.RawArray) 5507 5508 5509class ManagerMixin(BaseMixin): 5510 TYPE = 'manager' 5511 Process = multiprocessing.Process 5512 Queue = property(operator.attrgetter('manager.Queue')) 5513 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 5514 Lock = property(operator.attrgetter('manager.Lock')) 5515 RLock = property(operator.attrgetter('manager.RLock')) 5516 Semaphore = property(operator.attrgetter('manager.Semaphore')) 5517 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 5518 Condition = property(operator.attrgetter('manager.Condition')) 5519 Event = property(operator.attrgetter('manager.Event')) 5520 Barrier = property(operator.attrgetter('manager.Barrier')) 5521 Value = property(operator.attrgetter('manager.Value')) 5522 Array = property(operator.attrgetter('manager.Array')) 5523 list = property(operator.attrgetter('manager.list')) 5524 dict = property(operator.attrgetter('manager.dict')) 5525 Namespace = property(operator.attrgetter('manager.Namespace')) 5526 5527 @classmethod 5528 def Pool(cls, *args, **kwds): 5529 return cls.manager.Pool(*args, **kwds) 5530 5531 @classmethod 5532 def setUpClass(cls): 5533 super().setUpClass() 5534 cls.manager = multiprocessing.Manager() 5535 5536 @classmethod 5537 def tearDownClass(cls): 5538 # only the manager process should be returned by active_children() 5539 # but this can take a bit on slow machines, so wait a few seconds 5540 # if there are other children too (see #17395) 5541 start_time = time.monotonic() 5542 t = 0.01 5543 while len(multiprocessing.active_children()) > 1: 5544 time.sleep(t) 5545 t *= 2 5546 dt = time.monotonic() - start_time 5547 if dt >= 5.0: 5548 test.support.environment_altered = True 5549 print("Warning -- multiprocessing.Manager still has %s active " 5550 "children after %s seconds" 5551 % (multiprocessing.active_children(), dt), 5552 file=sys.stderr) 5553 break 5554 5555 gc.collect() # do garbage collection 5556 if cls.manager._number_of_objects() != 0: 5557 # This is not really an error since some tests do not 5558 # ensure that all processes which hold a reference to a 5559 # managed object have been joined. 5560 test.support.environment_altered = True 5561 print('Warning -- Shared objects which still exist at manager ' 5562 'shutdown:') 5563 print(cls.manager._debug_info()) 5564 cls.manager.shutdown() 5565 cls.manager.join() 5566 cls.manager = None 5567 5568 super().tearDownClass() 5569 5570 5571class ThreadsMixin(BaseMixin): 5572 TYPE = 'threads' 5573 Process = multiprocessing.dummy.Process 5574 connection = multiprocessing.dummy.connection 5575 current_process = staticmethod(multiprocessing.dummy.current_process) 5576 active_children = staticmethod(multiprocessing.dummy.active_children) 5577 Pool = staticmethod(multiprocessing.dummy.Pool) 5578 Pipe = staticmethod(multiprocessing.dummy.Pipe) 5579 Queue = staticmethod(multiprocessing.dummy.Queue) 5580 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 5581 Lock = staticmethod(multiprocessing.dummy.Lock) 5582 RLock = staticmethod(multiprocessing.dummy.RLock) 5583 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 5584 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 5585 Condition = staticmethod(multiprocessing.dummy.Condition) 5586 Event = staticmethod(multiprocessing.dummy.Event) 5587 Barrier = staticmethod(multiprocessing.dummy.Barrier) 5588 Value = staticmethod(multiprocessing.dummy.Value) 5589 Array = staticmethod(multiprocessing.dummy.Array) 5590 5591# 5592# Functions used to create test cases from the base ones in this module 5593# 5594 5595def install_tests_in_module_dict(remote_globs, start_method): 5596 __module__ = remote_globs['__name__'] 5597 local_globs = globals() 5598 ALL_TYPES = {'processes', 'threads', 'manager'} 5599 5600 for name, base in local_globs.items(): 5601 if not isinstance(base, type): 5602 continue 5603 if issubclass(base, BaseTestCase): 5604 if base is BaseTestCase: 5605 continue 5606 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 5607 for type_ in base.ALLOWED_TYPES: 5608 newname = 'With' + type_.capitalize() + name[1:] 5609 Mixin = local_globs[type_.capitalize() + 'Mixin'] 5610 class Temp(base, Mixin, unittest.TestCase): 5611 pass 5612 Temp.__name__ = Temp.__qualname__ = newname 5613 Temp.__module__ = __module__ 5614 remote_globs[newname] = Temp 5615 elif issubclass(base, unittest.TestCase): 5616 class Temp(base, object): 5617 pass 5618 Temp.__name__ = Temp.__qualname__ = name 5619 Temp.__module__ = __module__ 5620 remote_globs[name] = Temp 5621 5622 dangling = [None, None] 5623 old_start_method = [None] 5624 5625 def setUpModule(): 5626 multiprocessing.set_forkserver_preload(PRELOAD) 5627 multiprocessing.process._cleanup() 5628 dangling[0] = multiprocessing.process._dangling.copy() 5629 dangling[1] = threading._dangling.copy() 5630 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 5631 try: 5632 multiprocessing.set_start_method(start_method, force=True) 5633 except ValueError: 5634 raise unittest.SkipTest(start_method + 5635 ' start method not supported') 5636 5637 if sys.platform.startswith("linux"): 5638 try: 5639 lock = multiprocessing.RLock() 5640 except OSError: 5641 raise unittest.SkipTest("OSError raises on RLock creation, " 5642 "see issue 3111!") 5643 check_enough_semaphores() 5644 util.get_temp_dir() # creates temp directory 5645 multiprocessing.get_logger().setLevel(LOG_LEVEL) 5646 5647 def tearDownModule(): 5648 need_sleep = False 5649 5650 # bpo-26762: Some multiprocessing objects like Pool create reference 5651 # cycles. Trigger a garbage collection to break these cycles. 5652 test.support.gc_collect() 5653 5654 multiprocessing.set_start_method(old_start_method[0], force=True) 5655 # pause a bit so we don't get warning about dangling threads/processes 5656 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 5657 if processes: 5658 need_sleep = True 5659 test.support.environment_altered = True 5660 print('Warning -- Dangling processes: %s' % processes, 5661 file=sys.stderr) 5662 processes = None 5663 5664 threads = set(threading._dangling) - set(dangling[1]) 5665 if threads: 5666 need_sleep = True 5667 test.support.environment_altered = True 5668 print('Warning -- Dangling threads: %s' % threads, 5669 file=sys.stderr) 5670 threads = None 5671 5672 # Sleep 500 ms to give time to child processes to complete. 5673 if need_sleep: 5674 time.sleep(0.5) 5675 5676 multiprocessing.util._cleanup_tests() 5677 5678 remote_globs['setUpModule'] = setUpModule 5679 remote_globs['tearDownModule'] = tearDownModule 5680