1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import queue as pyqueue 7import contextlib 8import time 9import io 10import itertools 11import sys 12import os 13import gc 14import errno 15import signal 16import array 17import socket 18import random 19import logging 20import struct 21import operator 22import weakref 23import test.support 24import test.support.script_helper 25from test import support 26 27 28# Skip tests if _multiprocessing wasn't built. 29_multiprocessing = test.support.import_module('_multiprocessing') 30# Skip tests if sem_open implementation is broken. 31test.support.import_module('multiprocessing.synchronize') 32# import threading after _multiprocessing to raise a more relevant error 33# message: "No module named _multiprocessing". _multiprocessing is not compiled 34# without thread support. 35import threading 36 37import multiprocessing.connection 38import multiprocessing.dummy 39import multiprocessing.heap 40import multiprocessing.managers 41import multiprocessing.pool 42import multiprocessing.queues 43 44from multiprocessing import util 45 46try: 47 from multiprocessing import reduction 48 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 49except ImportError: 50 HAS_REDUCTION = False 51 52try: 53 from multiprocessing.sharedctypes import Value, copy 54 HAS_SHAREDCTYPES = True 55except ImportError: 56 HAS_SHAREDCTYPES = False 57 58try: 59 import msvcrt 60except ImportError: 61 msvcrt = None 62 63# 64# 65# 66 67# Timeout to wait until a process completes 68TIMEOUT = 30.0 # seconds 69 70def latin(s): 71 return s.encode('latin') 72 73 74def close_queue(queue): 75 if isinstance(queue, multiprocessing.queues.Queue): 76 queue.close() 77 queue.join_thread() 78 79 80def join_process(process): 81 # Since multiprocessing.Process has the same API than threading.Thread 82 # (join() and is_alive(), the support function can be reused 83 support.join_thread(process, timeout=TIMEOUT) 84 85 86# 87# Constants 88# 89 90LOG_LEVEL = util.SUBWARNING 91#LOG_LEVEL = logging.DEBUG 92 93DELTA = 0.1 94CHECK_TIMINGS = False # making true makes tests take a lot longer 95 # and can sometimes cause some non-serious 96 # failures because some calls block a bit 97 # longer than expected 98if CHECK_TIMINGS: 99 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 100else: 101 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 102 103HAVE_GETVALUE = not getattr(_multiprocessing, 104 'HAVE_BROKEN_SEM_GETVALUE', False) 105 106WIN32 = (sys.platform == "win32") 107 108from multiprocessing.connection import wait 109 110def wait_for_handle(handle, timeout): 111 if timeout is not None and timeout < 0.0: 112 timeout = None 113 return wait([handle], timeout) 114 115try: 116 MAXFD = os.sysconf("SC_OPEN_MAX") 117except: 118 MAXFD = 256 119 120# To speed up tests when using the forkserver, we can preload these: 121PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 122 123# 124# Some tests require ctypes 125# 126 127try: 128 from ctypes import Structure, c_int, c_double, c_longlong 129except ImportError: 130 Structure = object 131 c_int = c_double = c_longlong = None 132 133 134def check_enough_semaphores(): 135 """Check that the system supports enough semaphores to run the test.""" 136 # minimum number of semaphores available according to POSIX 137 nsems_min = 256 138 try: 139 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 140 except (AttributeError, ValueError): 141 # sysconf not available or setting not available 142 return 143 if nsems == -1 or nsems >= nsems_min: 144 return 145 raise unittest.SkipTest("The OS doesn't support enough semaphores " 146 "to run the test (required: %d)." % nsems_min) 147 148 149# 150# Creates a wrapper for a function which records the time it takes to finish 151# 152 153class TimingWrapper(object): 154 155 def __init__(self, func): 156 self.func = func 157 self.elapsed = None 158 159 def __call__(self, *args, **kwds): 160 t = time.monotonic() 161 try: 162 return self.func(*args, **kwds) 163 finally: 164 self.elapsed = time.monotonic() - t 165 166# 167# Base class for test cases 168# 169 170class BaseTestCase(object): 171 172 ALLOWED_TYPES = ('processes', 'manager', 'threads') 173 174 def assertTimingAlmostEqual(self, a, b): 175 if CHECK_TIMINGS: 176 self.assertAlmostEqual(a, b, 1) 177 178 def assertReturnsIfImplemented(self, value, func, *args): 179 try: 180 res = func(*args) 181 except NotImplementedError: 182 pass 183 else: 184 return self.assertEqual(value, res) 185 186 # For the sanity of Windows users, rather than crashing or freezing in 187 # multiple ways. 188 def __reduce__(self, *args): 189 raise NotImplementedError("shouldn't try to pickle a test case") 190 191 __reduce_ex__ = __reduce__ 192 193# 194# Return the value of a semaphore 195# 196 197def get_value(self): 198 try: 199 return self.get_value() 200 except AttributeError: 201 try: 202 return self._Semaphore__value 203 except AttributeError: 204 try: 205 return self._value 206 except AttributeError: 207 raise NotImplementedError 208 209# 210# Testcases 211# 212 213class DummyCallable: 214 def __call__(self, q, c): 215 assert isinstance(c, DummyCallable) 216 q.put(5) 217 218 219class _TestProcess(BaseTestCase): 220 221 ALLOWED_TYPES = ('processes', 'threads') 222 223 def test_current(self): 224 if self.TYPE == 'threads': 225 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 226 227 current = self.current_process() 228 authkey = current.authkey 229 230 self.assertTrue(current.is_alive()) 231 self.assertTrue(not current.daemon) 232 self.assertIsInstance(authkey, bytes) 233 self.assertTrue(len(authkey) > 0) 234 self.assertEqual(current.ident, os.getpid()) 235 self.assertEqual(current.exitcode, None) 236 237 def test_daemon_argument(self): 238 if self.TYPE == "threads": 239 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 240 241 # By default uses the current process's daemon flag. 242 proc0 = self.Process(target=self._test) 243 self.assertEqual(proc0.daemon, self.current_process().daemon) 244 proc1 = self.Process(target=self._test, daemon=True) 245 self.assertTrue(proc1.daemon) 246 proc2 = self.Process(target=self._test, daemon=False) 247 self.assertFalse(proc2.daemon) 248 249 @classmethod 250 def _test(cls, q, *args, **kwds): 251 current = cls.current_process() 252 q.put(args) 253 q.put(kwds) 254 q.put(current.name) 255 if cls.TYPE != 'threads': 256 q.put(bytes(current.authkey)) 257 q.put(current.pid) 258 259 def test_process(self): 260 q = self.Queue(1) 261 e = self.Event() 262 args = (q, 1, 2) 263 kwargs = {'hello':23, 'bye':2.54} 264 name = 'SomeProcess' 265 p = self.Process( 266 target=self._test, args=args, kwargs=kwargs, name=name 267 ) 268 p.daemon = True 269 current = self.current_process() 270 271 if self.TYPE != 'threads': 272 self.assertEqual(p.authkey, current.authkey) 273 self.assertEqual(p.is_alive(), False) 274 self.assertEqual(p.daemon, True) 275 self.assertNotIn(p, self.active_children()) 276 self.assertTrue(type(self.active_children()) is list) 277 self.assertEqual(p.exitcode, None) 278 279 p.start() 280 281 self.assertEqual(p.exitcode, None) 282 self.assertEqual(p.is_alive(), True) 283 self.assertIn(p, self.active_children()) 284 285 self.assertEqual(q.get(), args[1:]) 286 self.assertEqual(q.get(), kwargs) 287 self.assertEqual(q.get(), p.name) 288 if self.TYPE != 'threads': 289 self.assertEqual(q.get(), current.authkey) 290 self.assertEqual(q.get(), p.pid) 291 292 p.join() 293 294 self.assertEqual(p.exitcode, 0) 295 self.assertEqual(p.is_alive(), False) 296 self.assertNotIn(p, self.active_children()) 297 close_queue(q) 298 299 @classmethod 300 def _sleep_some(cls): 301 time.sleep(100) 302 303 @classmethod 304 def _test_sleep(cls, delay): 305 time.sleep(delay) 306 307 def _kill_process(self, meth): 308 if self.TYPE == 'threads': 309 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 310 311 p = self.Process(target=self._sleep_some) 312 p.daemon = True 313 p.start() 314 315 self.assertEqual(p.is_alive(), True) 316 self.assertIn(p, self.active_children()) 317 self.assertEqual(p.exitcode, None) 318 319 join = TimingWrapper(p.join) 320 321 self.assertEqual(join(0), None) 322 self.assertTimingAlmostEqual(join.elapsed, 0.0) 323 self.assertEqual(p.is_alive(), True) 324 325 self.assertEqual(join(-1), None) 326 self.assertTimingAlmostEqual(join.elapsed, 0.0) 327 self.assertEqual(p.is_alive(), True) 328 329 # XXX maybe terminating too soon causes the problems on Gentoo... 330 time.sleep(1) 331 332 meth(p) 333 334 if hasattr(signal, 'alarm'): 335 # On the Gentoo buildbot waitpid() often seems to block forever. 336 # We use alarm() to interrupt it if it blocks for too long. 337 def handler(*args): 338 raise RuntimeError('join took too long: %s' % p) 339 old_handler = signal.signal(signal.SIGALRM, handler) 340 try: 341 signal.alarm(10) 342 self.assertEqual(join(), None) 343 finally: 344 signal.alarm(0) 345 signal.signal(signal.SIGALRM, old_handler) 346 else: 347 self.assertEqual(join(), None) 348 349 self.assertTimingAlmostEqual(join.elapsed, 0.0) 350 351 self.assertEqual(p.is_alive(), False) 352 self.assertNotIn(p, self.active_children()) 353 354 p.join() 355 356 return p.exitcode 357 358 def test_terminate(self): 359 exitcode = self._kill_process(multiprocessing.Process.terminate) 360 if os.name != 'nt': 361 self.assertEqual(exitcode, -signal.SIGTERM) 362 363 def test_kill(self): 364 exitcode = self._kill_process(multiprocessing.Process.kill) 365 if os.name != 'nt': 366 self.assertEqual(exitcode, -signal.SIGKILL) 367 368 def test_cpu_count(self): 369 try: 370 cpus = multiprocessing.cpu_count() 371 except NotImplementedError: 372 cpus = 1 373 self.assertTrue(type(cpus) is int) 374 self.assertTrue(cpus >= 1) 375 376 def test_active_children(self): 377 self.assertEqual(type(self.active_children()), list) 378 379 p = self.Process(target=time.sleep, args=(DELTA,)) 380 self.assertNotIn(p, self.active_children()) 381 382 p.daemon = True 383 p.start() 384 self.assertIn(p, self.active_children()) 385 386 p.join() 387 self.assertNotIn(p, self.active_children()) 388 389 @classmethod 390 def _test_recursion(cls, wconn, id): 391 wconn.send(id) 392 if len(id) < 2: 393 for i in range(2): 394 p = cls.Process( 395 target=cls._test_recursion, args=(wconn, id+[i]) 396 ) 397 p.start() 398 p.join() 399 400 def test_recursion(self): 401 rconn, wconn = self.Pipe(duplex=False) 402 self._test_recursion(wconn, []) 403 404 time.sleep(DELTA) 405 result = [] 406 while rconn.poll(): 407 result.append(rconn.recv()) 408 409 expected = [ 410 [], 411 [0], 412 [0, 0], 413 [0, 1], 414 [1], 415 [1, 0], 416 [1, 1] 417 ] 418 self.assertEqual(result, expected) 419 420 @classmethod 421 def _test_sentinel(cls, event): 422 event.wait(10.0) 423 424 def test_sentinel(self): 425 if self.TYPE == "threads": 426 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 427 event = self.Event() 428 p = self.Process(target=self._test_sentinel, args=(event,)) 429 with self.assertRaises(ValueError): 430 p.sentinel 431 p.start() 432 self.addCleanup(p.join) 433 sentinel = p.sentinel 434 self.assertIsInstance(sentinel, int) 435 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 436 event.set() 437 p.join() 438 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 439 440 @classmethod 441 def _test_close(cls, rc=0, q=None): 442 if q is not None: 443 q.get() 444 sys.exit(rc) 445 446 def test_close(self): 447 if self.TYPE == "threads": 448 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 449 q = self.Queue() 450 p = self.Process(target=self._test_close, kwargs={'q': q}) 451 p.daemon = True 452 p.start() 453 self.assertEqual(p.is_alive(), True) 454 # Child is still alive, cannot close 455 with self.assertRaises(ValueError): 456 p.close() 457 458 q.put(None) 459 p.join() 460 self.assertEqual(p.is_alive(), False) 461 self.assertEqual(p.exitcode, 0) 462 p.close() 463 with self.assertRaises(ValueError): 464 p.is_alive() 465 with self.assertRaises(ValueError): 466 p.join() 467 with self.assertRaises(ValueError): 468 p.terminate() 469 p.close() 470 471 wr = weakref.ref(p) 472 del p 473 gc.collect() 474 self.assertIs(wr(), None) 475 476 close_queue(q) 477 478 def test_many_processes(self): 479 if self.TYPE == 'threads': 480 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 481 482 sm = multiprocessing.get_start_method() 483 N = 5 if sm == 'spawn' else 100 484 485 # Try to overwhelm the forkserver loop with events 486 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 487 for i in range(N)] 488 for p in procs: 489 p.start() 490 for p in procs: 491 join_process(p) 492 for p in procs: 493 self.assertEqual(p.exitcode, 0) 494 495 procs = [self.Process(target=self._sleep_some) 496 for i in range(N)] 497 for p in procs: 498 p.start() 499 time.sleep(0.001) # let the children start... 500 for p in procs: 501 p.terminate() 502 for p in procs: 503 join_process(p) 504 if os.name != 'nt': 505 exitcodes = [-signal.SIGTERM] 506 if sys.platform == 'darwin': 507 # bpo-31510: On macOS, killing a freshly started process with 508 # SIGTERM sometimes kills the process with SIGKILL. 509 exitcodes.append(-signal.SIGKILL) 510 for p in procs: 511 self.assertIn(p.exitcode, exitcodes) 512 513 def test_lose_target_ref(self): 514 c = DummyCallable() 515 wr = weakref.ref(c) 516 q = self.Queue() 517 p = self.Process(target=c, args=(q, c)) 518 del c 519 p.start() 520 p.join() 521 self.assertIs(wr(), None) 522 self.assertEqual(q.get(), 5) 523 close_queue(q) 524 525 @classmethod 526 def _test_child_fd_inflation(self, evt, q): 527 q.put(test.support.fd_count()) 528 evt.wait() 529 530 def test_child_fd_inflation(self): 531 # Number of fds in child processes should not grow with the 532 # number of running children. 533 if self.TYPE == 'threads': 534 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 535 536 sm = multiprocessing.get_start_method() 537 if sm == 'fork': 538 # The fork method by design inherits all fds from the parent, 539 # trying to go against it is a lost battle 540 self.skipTest('test not appropriate for {}'.format(sm)) 541 542 N = 5 543 evt = self.Event() 544 q = self.Queue() 545 546 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 547 for i in range(N)] 548 for p in procs: 549 p.start() 550 551 try: 552 fd_counts = [q.get() for i in range(N)] 553 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 554 555 finally: 556 evt.set() 557 for p in procs: 558 p.join() 559 close_queue(q) 560 561 @classmethod 562 def _test_wait_for_threads(self, evt): 563 def func1(): 564 time.sleep(0.5) 565 evt.set() 566 567 def func2(): 568 time.sleep(20) 569 evt.clear() 570 571 threading.Thread(target=func1).start() 572 threading.Thread(target=func2, daemon=True).start() 573 574 def test_wait_for_threads(self): 575 # A child process should wait for non-daemonic threads to end 576 # before exiting 577 if self.TYPE == 'threads': 578 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 579 580 evt = self.Event() 581 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 582 proc.start() 583 proc.join() 584 self.assertTrue(evt.is_set()) 585 586 @classmethod 587 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 588 for stream_name, action in break_std_streams.items(): 589 if action == 'close': 590 stream = io.StringIO() 591 stream.close() 592 else: 593 assert action == 'remove' 594 stream = None 595 setattr(sys, stream_name, None) 596 evt.set() 597 598 def test_error_on_stdio_flush_1(self): 599 # Check that Process works with broken standard streams 600 streams = [io.StringIO(), None] 601 streams[0].close() 602 for stream_name in ('stdout', 'stderr'): 603 for stream in streams: 604 old_stream = getattr(sys, stream_name) 605 setattr(sys, stream_name, stream) 606 try: 607 evt = self.Event() 608 proc = self.Process(target=self._test_error_on_stdio_flush, 609 args=(evt,)) 610 proc.start() 611 proc.join() 612 self.assertTrue(evt.is_set()) 613 self.assertEqual(proc.exitcode, 0) 614 finally: 615 setattr(sys, stream_name, old_stream) 616 617 def test_error_on_stdio_flush_2(self): 618 # Same as test_error_on_stdio_flush_1(), but standard streams are 619 # broken by the child process 620 for stream_name in ('stdout', 'stderr'): 621 for action in ('close', 'remove'): 622 old_stream = getattr(sys, stream_name) 623 try: 624 evt = self.Event() 625 proc = self.Process(target=self._test_error_on_stdio_flush, 626 args=(evt, {stream_name: action})) 627 proc.start() 628 proc.join() 629 self.assertTrue(evt.is_set()) 630 self.assertEqual(proc.exitcode, 0) 631 finally: 632 setattr(sys, stream_name, old_stream) 633 634 @classmethod 635 def _sleep_and_set_event(self, evt, delay=0.0): 636 time.sleep(delay) 637 evt.set() 638 639 def check_forkserver_death(self, signum): 640 # bpo-31308: if the forkserver process has died, we should still 641 # be able to create and run new Process instances (the forkserver 642 # is implicitly restarted). 643 if self.TYPE == 'threads': 644 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 645 sm = multiprocessing.get_start_method() 646 if sm != 'forkserver': 647 # The fork method by design inherits all fds from the parent, 648 # trying to go against it is a lost battle 649 self.skipTest('test not appropriate for {}'.format(sm)) 650 651 from multiprocessing.forkserver import _forkserver 652 _forkserver.ensure_running() 653 654 # First process sleeps 500 ms 655 delay = 0.5 656 657 evt = self.Event() 658 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 659 proc.start() 660 661 pid = _forkserver._forkserver_pid 662 os.kill(pid, signum) 663 # give time to the fork server to die and time to proc to complete 664 time.sleep(delay * 2.0) 665 666 evt2 = self.Event() 667 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 668 proc2.start() 669 proc2.join() 670 self.assertTrue(evt2.is_set()) 671 self.assertEqual(proc2.exitcode, 0) 672 673 proc.join() 674 self.assertTrue(evt.is_set()) 675 self.assertIn(proc.exitcode, (0, 255)) 676 677 def test_forkserver_sigint(self): 678 # Catchable signal 679 self.check_forkserver_death(signal.SIGINT) 680 681 def test_forkserver_sigkill(self): 682 # Uncatchable signal 683 if os.name != 'nt': 684 self.check_forkserver_death(signal.SIGKILL) 685 686 687# 688# 689# 690 691class _UpperCaser(multiprocessing.Process): 692 693 def __init__(self): 694 multiprocessing.Process.__init__(self) 695 self.child_conn, self.parent_conn = multiprocessing.Pipe() 696 697 def run(self): 698 self.parent_conn.close() 699 for s in iter(self.child_conn.recv, None): 700 self.child_conn.send(s.upper()) 701 self.child_conn.close() 702 703 def submit(self, s): 704 assert type(s) is str 705 self.parent_conn.send(s) 706 return self.parent_conn.recv() 707 708 def stop(self): 709 self.parent_conn.send(None) 710 self.parent_conn.close() 711 self.child_conn.close() 712 713class _TestSubclassingProcess(BaseTestCase): 714 715 ALLOWED_TYPES = ('processes',) 716 717 def test_subclassing(self): 718 uppercaser = _UpperCaser() 719 uppercaser.daemon = True 720 uppercaser.start() 721 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 722 self.assertEqual(uppercaser.submit('world'), 'WORLD') 723 uppercaser.stop() 724 uppercaser.join() 725 726 def test_stderr_flush(self): 727 # sys.stderr is flushed at process shutdown (issue #13812) 728 if self.TYPE == "threads": 729 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 730 731 testfn = test.support.TESTFN 732 self.addCleanup(test.support.unlink, testfn) 733 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 734 proc.start() 735 proc.join() 736 with open(testfn, 'r') as f: 737 err = f.read() 738 # The whole traceback was printed 739 self.assertIn("ZeroDivisionError", err) 740 self.assertIn("test_multiprocessing.py", err) 741 self.assertIn("1/0 # MARKER", err) 742 743 @classmethod 744 def _test_stderr_flush(cls, testfn): 745 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 746 sys.stderr = open(fd, 'w', closefd=False) 747 1/0 # MARKER 748 749 750 @classmethod 751 def _test_sys_exit(cls, reason, testfn): 752 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 753 sys.stderr = open(fd, 'w', closefd=False) 754 sys.exit(reason) 755 756 def test_sys_exit(self): 757 # See Issue 13854 758 if self.TYPE == 'threads': 759 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 760 761 testfn = test.support.TESTFN 762 self.addCleanup(test.support.unlink, testfn) 763 764 for reason in ( 765 [1, 2, 3], 766 'ignore this', 767 ): 768 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 769 p.daemon = True 770 p.start() 771 join_process(p) 772 self.assertEqual(p.exitcode, 1) 773 774 with open(testfn, 'r') as f: 775 content = f.read() 776 self.assertEqual(content.rstrip(), str(reason)) 777 778 os.unlink(testfn) 779 780 for reason in (True, False, 8): 781 p = self.Process(target=sys.exit, args=(reason,)) 782 p.daemon = True 783 p.start() 784 join_process(p) 785 self.assertEqual(p.exitcode, reason) 786 787# 788# 789# 790 791def queue_empty(q): 792 if hasattr(q, 'empty'): 793 return q.empty() 794 else: 795 return q.qsize() == 0 796 797def queue_full(q, maxsize): 798 if hasattr(q, 'full'): 799 return q.full() 800 else: 801 return q.qsize() == maxsize 802 803 804class _TestQueue(BaseTestCase): 805 806 807 @classmethod 808 def _test_put(cls, queue, child_can_start, parent_can_continue): 809 child_can_start.wait() 810 for i in range(6): 811 queue.get() 812 parent_can_continue.set() 813 814 def test_put(self): 815 MAXSIZE = 6 816 queue = self.Queue(maxsize=MAXSIZE) 817 child_can_start = self.Event() 818 parent_can_continue = self.Event() 819 820 proc = self.Process( 821 target=self._test_put, 822 args=(queue, child_can_start, parent_can_continue) 823 ) 824 proc.daemon = True 825 proc.start() 826 827 self.assertEqual(queue_empty(queue), True) 828 self.assertEqual(queue_full(queue, MAXSIZE), False) 829 830 queue.put(1) 831 queue.put(2, True) 832 queue.put(3, True, None) 833 queue.put(4, False) 834 queue.put(5, False, None) 835 queue.put_nowait(6) 836 837 # the values may be in buffer but not yet in pipe so sleep a bit 838 time.sleep(DELTA) 839 840 self.assertEqual(queue_empty(queue), False) 841 self.assertEqual(queue_full(queue, MAXSIZE), True) 842 843 put = TimingWrapper(queue.put) 844 put_nowait = TimingWrapper(queue.put_nowait) 845 846 self.assertRaises(pyqueue.Full, put, 7, False) 847 self.assertTimingAlmostEqual(put.elapsed, 0) 848 849 self.assertRaises(pyqueue.Full, put, 7, False, None) 850 self.assertTimingAlmostEqual(put.elapsed, 0) 851 852 self.assertRaises(pyqueue.Full, put_nowait, 7) 853 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 854 855 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 856 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 857 858 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 859 self.assertTimingAlmostEqual(put.elapsed, 0) 860 861 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 862 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 863 864 child_can_start.set() 865 parent_can_continue.wait() 866 867 self.assertEqual(queue_empty(queue), True) 868 self.assertEqual(queue_full(queue, MAXSIZE), False) 869 870 proc.join() 871 close_queue(queue) 872 873 @classmethod 874 def _test_get(cls, queue, child_can_start, parent_can_continue): 875 child_can_start.wait() 876 #queue.put(1) 877 queue.put(2) 878 queue.put(3) 879 queue.put(4) 880 queue.put(5) 881 parent_can_continue.set() 882 883 def test_get(self): 884 queue = self.Queue() 885 child_can_start = self.Event() 886 parent_can_continue = self.Event() 887 888 proc = self.Process( 889 target=self._test_get, 890 args=(queue, child_can_start, parent_can_continue) 891 ) 892 proc.daemon = True 893 proc.start() 894 895 self.assertEqual(queue_empty(queue), True) 896 897 child_can_start.set() 898 parent_can_continue.wait() 899 900 time.sleep(DELTA) 901 self.assertEqual(queue_empty(queue), False) 902 903 # Hangs unexpectedly, remove for now 904 #self.assertEqual(queue.get(), 1) 905 self.assertEqual(queue.get(True, None), 2) 906 self.assertEqual(queue.get(True), 3) 907 self.assertEqual(queue.get(timeout=1), 4) 908 self.assertEqual(queue.get_nowait(), 5) 909 910 self.assertEqual(queue_empty(queue), True) 911 912 get = TimingWrapper(queue.get) 913 get_nowait = TimingWrapper(queue.get_nowait) 914 915 self.assertRaises(pyqueue.Empty, get, False) 916 self.assertTimingAlmostEqual(get.elapsed, 0) 917 918 self.assertRaises(pyqueue.Empty, get, False, None) 919 self.assertTimingAlmostEqual(get.elapsed, 0) 920 921 self.assertRaises(pyqueue.Empty, get_nowait) 922 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 923 924 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 925 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 926 927 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 928 self.assertTimingAlmostEqual(get.elapsed, 0) 929 930 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 931 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 932 933 proc.join() 934 close_queue(queue) 935 936 @classmethod 937 def _test_fork(cls, queue): 938 for i in range(10, 20): 939 queue.put(i) 940 # note that at this point the items may only be buffered, so the 941 # process cannot shutdown until the feeder thread has finished 942 # pushing items onto the pipe. 943 944 def test_fork(self): 945 # Old versions of Queue would fail to create a new feeder 946 # thread for a forked process if the original process had its 947 # own feeder thread. This test checks that this no longer 948 # happens. 949 950 queue = self.Queue() 951 952 # put items on queue so that main process starts a feeder thread 953 for i in range(10): 954 queue.put(i) 955 956 # wait to make sure thread starts before we fork a new process 957 time.sleep(DELTA) 958 959 # fork process 960 p = self.Process(target=self._test_fork, args=(queue,)) 961 p.daemon = True 962 p.start() 963 964 # check that all expected items are in the queue 965 for i in range(20): 966 self.assertEqual(queue.get(), i) 967 self.assertRaises(pyqueue.Empty, queue.get, False) 968 969 p.join() 970 close_queue(queue) 971 972 def test_qsize(self): 973 q = self.Queue() 974 try: 975 self.assertEqual(q.qsize(), 0) 976 except NotImplementedError: 977 self.skipTest('qsize method not implemented') 978 q.put(1) 979 self.assertEqual(q.qsize(), 1) 980 q.put(5) 981 self.assertEqual(q.qsize(), 2) 982 q.get() 983 self.assertEqual(q.qsize(), 1) 984 q.get() 985 self.assertEqual(q.qsize(), 0) 986 close_queue(q) 987 988 @classmethod 989 def _test_task_done(cls, q): 990 for obj in iter(q.get, None): 991 time.sleep(DELTA) 992 q.task_done() 993 994 def test_task_done(self): 995 queue = self.JoinableQueue() 996 997 workers = [self.Process(target=self._test_task_done, args=(queue,)) 998 for i in range(4)] 999 1000 for p in workers: 1001 p.daemon = True 1002 p.start() 1003 1004 for i in range(10): 1005 queue.put(i) 1006 1007 queue.join() 1008 1009 for p in workers: 1010 queue.put(None) 1011 1012 for p in workers: 1013 p.join() 1014 close_queue(queue) 1015 1016 def test_no_import_lock_contention(self): 1017 with test.support.temp_cwd(): 1018 module_name = 'imported_by_an_imported_module' 1019 with open(module_name + '.py', 'w') as f: 1020 f.write("""if 1: 1021 import multiprocessing 1022 1023 q = multiprocessing.Queue() 1024 q.put('knock knock') 1025 q.get(timeout=3) 1026 q.close() 1027 del q 1028 """) 1029 1030 with test.support.DirsOnSysPath(os.getcwd()): 1031 try: 1032 __import__(module_name) 1033 except pyqueue.Empty: 1034 self.fail("Probable regression on import lock contention;" 1035 " see Issue #22853") 1036 1037 def test_timeout(self): 1038 q = multiprocessing.Queue() 1039 start = time.monotonic() 1040 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1041 delta = time.monotonic() - start 1042 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1043 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1044 # failed because the delta was only 135.8 ms. 1045 self.assertGreaterEqual(delta, 0.100) 1046 close_queue(q) 1047 1048 def test_queue_feeder_donot_stop_onexc(self): 1049 # bpo-30414: verify feeder handles exceptions correctly 1050 if self.TYPE != 'processes': 1051 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1052 1053 class NotSerializable(object): 1054 def __reduce__(self): 1055 raise AttributeError 1056 with test.support.captured_stderr(): 1057 q = self.Queue() 1058 q.put(NotSerializable()) 1059 q.put(True) 1060 # bpo-30595: use a timeout of 1 second for slow buildbots 1061 self.assertTrue(q.get(timeout=1.0)) 1062 close_queue(q) 1063 1064 with test.support.captured_stderr(): 1065 # bpo-33078: verify that the queue size is correctly handled 1066 # on errors. 1067 q = self.Queue(maxsize=1) 1068 q.put(NotSerializable()) 1069 q.put(True) 1070 try: 1071 self.assertEqual(q.qsize(), 1) 1072 except NotImplementedError: 1073 # qsize is not available on all platform as it 1074 # relies on sem_getvalue 1075 pass 1076 # bpo-30595: use a timeout of 1 second for slow buildbots 1077 self.assertTrue(q.get(timeout=1.0)) 1078 # Check that the size of the queue is correct 1079 self.assertTrue(q.empty()) 1080 close_queue(q) 1081 1082 def test_queue_feeder_on_queue_feeder_error(self): 1083 # bpo-30006: verify feeder handles exceptions using the 1084 # _on_queue_feeder_error hook. 1085 if self.TYPE != 'processes': 1086 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1087 1088 class NotSerializable(object): 1089 """Mock unserializable object""" 1090 def __init__(self): 1091 self.reduce_was_called = False 1092 self.on_queue_feeder_error_was_called = False 1093 1094 def __reduce__(self): 1095 self.reduce_was_called = True 1096 raise AttributeError 1097 1098 class SafeQueue(multiprocessing.queues.Queue): 1099 """Queue with overloaded _on_queue_feeder_error hook""" 1100 @staticmethod 1101 def _on_queue_feeder_error(e, obj): 1102 if (isinstance(e, AttributeError) and 1103 isinstance(obj, NotSerializable)): 1104 obj.on_queue_feeder_error_was_called = True 1105 1106 not_serializable_obj = NotSerializable() 1107 # The captured_stderr reduces the noise in the test report 1108 with test.support.captured_stderr(): 1109 q = SafeQueue(ctx=multiprocessing.get_context()) 1110 q.put(not_serializable_obj) 1111 1112 # Verify that q is still functioning correctly 1113 q.put(True) 1114 self.assertTrue(q.get(timeout=1.0)) 1115 1116 # Assert that the serialization and the hook have been called correctly 1117 self.assertTrue(not_serializable_obj.reduce_was_called) 1118 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1119# 1120# 1121# 1122 1123class _TestLock(BaseTestCase): 1124 1125 def test_lock(self): 1126 lock = self.Lock() 1127 self.assertEqual(lock.acquire(), True) 1128 self.assertEqual(lock.acquire(False), False) 1129 self.assertEqual(lock.release(), None) 1130 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1131 1132 def test_rlock(self): 1133 lock = self.RLock() 1134 self.assertEqual(lock.acquire(), True) 1135 self.assertEqual(lock.acquire(), True) 1136 self.assertEqual(lock.acquire(), True) 1137 self.assertEqual(lock.release(), None) 1138 self.assertEqual(lock.release(), None) 1139 self.assertEqual(lock.release(), None) 1140 self.assertRaises((AssertionError, RuntimeError), lock.release) 1141 1142 def test_lock_context(self): 1143 with self.Lock(): 1144 pass 1145 1146 1147class _TestSemaphore(BaseTestCase): 1148 1149 def _test_semaphore(self, sem): 1150 self.assertReturnsIfImplemented(2, get_value, sem) 1151 self.assertEqual(sem.acquire(), True) 1152 self.assertReturnsIfImplemented(1, get_value, sem) 1153 self.assertEqual(sem.acquire(), True) 1154 self.assertReturnsIfImplemented(0, get_value, sem) 1155 self.assertEqual(sem.acquire(False), False) 1156 self.assertReturnsIfImplemented(0, get_value, sem) 1157 self.assertEqual(sem.release(), None) 1158 self.assertReturnsIfImplemented(1, get_value, sem) 1159 self.assertEqual(sem.release(), None) 1160 self.assertReturnsIfImplemented(2, get_value, sem) 1161 1162 def test_semaphore(self): 1163 sem = self.Semaphore(2) 1164 self._test_semaphore(sem) 1165 self.assertEqual(sem.release(), None) 1166 self.assertReturnsIfImplemented(3, get_value, sem) 1167 self.assertEqual(sem.release(), None) 1168 self.assertReturnsIfImplemented(4, get_value, sem) 1169 1170 def test_bounded_semaphore(self): 1171 sem = self.BoundedSemaphore(2) 1172 self._test_semaphore(sem) 1173 # Currently fails on OS/X 1174 #if HAVE_GETVALUE: 1175 # self.assertRaises(ValueError, sem.release) 1176 # self.assertReturnsIfImplemented(2, get_value, sem) 1177 1178 def test_timeout(self): 1179 if self.TYPE != 'processes': 1180 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1181 1182 sem = self.Semaphore(0) 1183 acquire = TimingWrapper(sem.acquire) 1184 1185 self.assertEqual(acquire(False), False) 1186 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1187 1188 self.assertEqual(acquire(False, None), False) 1189 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1190 1191 self.assertEqual(acquire(False, TIMEOUT1), False) 1192 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1193 1194 self.assertEqual(acquire(True, TIMEOUT2), False) 1195 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1196 1197 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1198 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1199 1200 1201class _TestCondition(BaseTestCase): 1202 1203 @classmethod 1204 def f(cls, cond, sleeping, woken, timeout=None): 1205 cond.acquire() 1206 sleeping.release() 1207 cond.wait(timeout) 1208 woken.release() 1209 cond.release() 1210 1211 def assertReachesEventually(self, func, value): 1212 for i in range(10): 1213 try: 1214 if func() == value: 1215 break 1216 except NotImplementedError: 1217 break 1218 time.sleep(DELTA) 1219 time.sleep(DELTA) 1220 self.assertReturnsIfImplemented(value, func) 1221 1222 def check_invariant(self, cond): 1223 # this is only supposed to succeed when there are no sleepers 1224 if self.TYPE == 'processes': 1225 try: 1226 sleepers = (cond._sleeping_count.get_value() - 1227 cond._woken_count.get_value()) 1228 self.assertEqual(sleepers, 0) 1229 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1230 except NotImplementedError: 1231 pass 1232 1233 def test_notify(self): 1234 cond = self.Condition() 1235 sleeping = self.Semaphore(0) 1236 woken = self.Semaphore(0) 1237 1238 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1239 p.daemon = True 1240 p.start() 1241 self.addCleanup(p.join) 1242 1243 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1244 p.daemon = True 1245 p.start() 1246 self.addCleanup(p.join) 1247 1248 # wait for both children to start sleeping 1249 sleeping.acquire() 1250 sleeping.acquire() 1251 1252 # check no process/thread has woken up 1253 time.sleep(DELTA) 1254 self.assertReturnsIfImplemented(0, get_value, woken) 1255 1256 # wake up one process/thread 1257 cond.acquire() 1258 cond.notify() 1259 cond.release() 1260 1261 # check one process/thread has woken up 1262 time.sleep(DELTA) 1263 self.assertReturnsIfImplemented(1, get_value, woken) 1264 1265 # wake up another 1266 cond.acquire() 1267 cond.notify() 1268 cond.release() 1269 1270 # check other has woken up 1271 time.sleep(DELTA) 1272 self.assertReturnsIfImplemented(2, get_value, woken) 1273 1274 # check state is not mucked up 1275 self.check_invariant(cond) 1276 p.join() 1277 1278 def test_notify_all(self): 1279 cond = self.Condition() 1280 sleeping = self.Semaphore(0) 1281 woken = self.Semaphore(0) 1282 1283 # start some threads/processes which will timeout 1284 for i in range(3): 1285 p = self.Process(target=self.f, 1286 args=(cond, sleeping, woken, TIMEOUT1)) 1287 p.daemon = True 1288 p.start() 1289 self.addCleanup(p.join) 1290 1291 t = threading.Thread(target=self.f, 1292 args=(cond, sleeping, woken, TIMEOUT1)) 1293 t.daemon = True 1294 t.start() 1295 self.addCleanup(t.join) 1296 1297 # wait for them all to sleep 1298 for i in range(6): 1299 sleeping.acquire() 1300 1301 # check they have all timed out 1302 for i in range(6): 1303 woken.acquire() 1304 self.assertReturnsIfImplemented(0, get_value, woken) 1305 1306 # check state is not mucked up 1307 self.check_invariant(cond) 1308 1309 # start some more threads/processes 1310 for i in range(3): 1311 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1312 p.daemon = True 1313 p.start() 1314 self.addCleanup(p.join) 1315 1316 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1317 t.daemon = True 1318 t.start() 1319 self.addCleanup(t.join) 1320 1321 # wait for them to all sleep 1322 for i in range(6): 1323 sleeping.acquire() 1324 1325 # check no process/thread has woken up 1326 time.sleep(DELTA) 1327 self.assertReturnsIfImplemented(0, get_value, woken) 1328 1329 # wake them all up 1330 cond.acquire() 1331 cond.notify_all() 1332 cond.release() 1333 1334 # check they have all woken 1335 self.assertReachesEventually(lambda: get_value(woken), 6) 1336 1337 # check state is not mucked up 1338 self.check_invariant(cond) 1339 1340 def test_notify_n(self): 1341 cond = self.Condition() 1342 sleeping = self.Semaphore(0) 1343 woken = self.Semaphore(0) 1344 1345 # start some threads/processes 1346 for i in range(3): 1347 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1348 p.daemon = True 1349 p.start() 1350 self.addCleanup(p.join) 1351 1352 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1353 t.daemon = True 1354 t.start() 1355 self.addCleanup(t.join) 1356 1357 # wait for them to all sleep 1358 for i in range(6): 1359 sleeping.acquire() 1360 1361 # check no process/thread has woken up 1362 time.sleep(DELTA) 1363 self.assertReturnsIfImplemented(0, get_value, woken) 1364 1365 # wake some of them up 1366 cond.acquire() 1367 cond.notify(n=2) 1368 cond.release() 1369 1370 # check 2 have woken 1371 self.assertReachesEventually(lambda: get_value(woken), 2) 1372 1373 # wake the rest of them 1374 cond.acquire() 1375 cond.notify(n=4) 1376 cond.release() 1377 1378 self.assertReachesEventually(lambda: get_value(woken), 6) 1379 1380 # doesn't do anything more 1381 cond.acquire() 1382 cond.notify(n=3) 1383 cond.release() 1384 1385 self.assertReturnsIfImplemented(6, get_value, woken) 1386 1387 # check state is not mucked up 1388 self.check_invariant(cond) 1389 1390 def test_timeout(self): 1391 cond = self.Condition() 1392 wait = TimingWrapper(cond.wait) 1393 cond.acquire() 1394 res = wait(TIMEOUT1) 1395 cond.release() 1396 self.assertEqual(res, False) 1397 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1398 1399 @classmethod 1400 def _test_waitfor_f(cls, cond, state): 1401 with cond: 1402 state.value = 0 1403 cond.notify() 1404 result = cond.wait_for(lambda : state.value==4) 1405 if not result or state.value != 4: 1406 sys.exit(1) 1407 1408 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1409 def test_waitfor(self): 1410 # based on test in test/lock_tests.py 1411 cond = self.Condition() 1412 state = self.Value('i', -1) 1413 1414 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1415 p.daemon = True 1416 p.start() 1417 1418 with cond: 1419 result = cond.wait_for(lambda : state.value==0) 1420 self.assertTrue(result) 1421 self.assertEqual(state.value, 0) 1422 1423 for i in range(4): 1424 time.sleep(0.01) 1425 with cond: 1426 state.value += 1 1427 cond.notify() 1428 1429 join_process(p) 1430 self.assertEqual(p.exitcode, 0) 1431 1432 @classmethod 1433 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1434 sem.release() 1435 with cond: 1436 expected = 0.1 1437 dt = time.monotonic() 1438 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1439 dt = time.monotonic() - dt 1440 # borrow logic in assertTimeout() from test/lock_tests.py 1441 if not result and expected * 0.6 < dt < expected * 10.0: 1442 success.value = True 1443 1444 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1445 def test_waitfor_timeout(self): 1446 # based on test in test/lock_tests.py 1447 cond = self.Condition() 1448 state = self.Value('i', 0) 1449 success = self.Value('i', False) 1450 sem = self.Semaphore(0) 1451 1452 p = self.Process(target=self._test_waitfor_timeout_f, 1453 args=(cond, state, success, sem)) 1454 p.daemon = True 1455 p.start() 1456 self.assertTrue(sem.acquire(timeout=TIMEOUT)) 1457 1458 # Only increment 3 times, so state == 4 is never reached. 1459 for i in range(3): 1460 time.sleep(0.01) 1461 with cond: 1462 state.value += 1 1463 cond.notify() 1464 1465 join_process(p) 1466 self.assertTrue(success.value) 1467 1468 @classmethod 1469 def _test_wait_result(cls, c, pid): 1470 with c: 1471 c.notify() 1472 time.sleep(1) 1473 if pid is not None: 1474 os.kill(pid, signal.SIGINT) 1475 1476 def test_wait_result(self): 1477 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1478 pid = os.getpid() 1479 else: 1480 pid = None 1481 1482 c = self.Condition() 1483 with c: 1484 self.assertFalse(c.wait(0)) 1485 self.assertFalse(c.wait(0.1)) 1486 1487 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1488 p.start() 1489 1490 self.assertTrue(c.wait(60)) 1491 if pid is not None: 1492 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1493 1494 p.join() 1495 1496 1497class _TestEvent(BaseTestCase): 1498 1499 @classmethod 1500 def _test_event(cls, event): 1501 time.sleep(TIMEOUT2) 1502 event.set() 1503 1504 def test_event(self): 1505 event = self.Event() 1506 wait = TimingWrapper(event.wait) 1507 1508 # Removed temporarily, due to API shear, this does not 1509 # work with threading._Event objects. is_set == isSet 1510 self.assertEqual(event.is_set(), False) 1511 1512 # Removed, threading.Event.wait() will return the value of the __flag 1513 # instead of None. API Shear with the semaphore backed mp.Event 1514 self.assertEqual(wait(0.0), False) 1515 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1516 self.assertEqual(wait(TIMEOUT1), False) 1517 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1518 1519 event.set() 1520 1521 # See note above on the API differences 1522 self.assertEqual(event.is_set(), True) 1523 self.assertEqual(wait(), True) 1524 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1525 self.assertEqual(wait(TIMEOUT1), True) 1526 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1527 # self.assertEqual(event.is_set(), True) 1528 1529 event.clear() 1530 1531 #self.assertEqual(event.is_set(), False) 1532 1533 p = self.Process(target=self._test_event, args=(event,)) 1534 p.daemon = True 1535 p.start() 1536 self.assertEqual(wait(), True) 1537 p.join() 1538 1539# 1540# Tests for Barrier - adapted from tests in test/lock_tests.py 1541# 1542 1543# Many of the tests for threading.Barrier use a list as an atomic 1544# counter: a value is appended to increment the counter, and the 1545# length of the list gives the value. We use the class DummyList 1546# for the same purpose. 1547 1548class _DummyList(object): 1549 1550 def __init__(self): 1551 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1552 lock = multiprocessing.Lock() 1553 self.__setstate__((wrapper, lock)) 1554 self._lengthbuf[0] = 0 1555 1556 def __setstate__(self, state): 1557 (self._wrapper, self._lock) = state 1558 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1559 1560 def __getstate__(self): 1561 return (self._wrapper, self._lock) 1562 1563 def append(self, _): 1564 with self._lock: 1565 self._lengthbuf[0] += 1 1566 1567 def __len__(self): 1568 with self._lock: 1569 return self._lengthbuf[0] 1570 1571def _wait(): 1572 # A crude wait/yield function not relying on synchronization primitives. 1573 time.sleep(0.01) 1574 1575 1576class Bunch(object): 1577 """ 1578 A bunch of threads. 1579 """ 1580 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1581 """ 1582 Construct a bunch of `n` threads running the same function `f`. 1583 If `wait_before_exit` is True, the threads won't terminate until 1584 do_finish() is called. 1585 """ 1586 self.f = f 1587 self.args = args 1588 self.n = n 1589 self.started = namespace.DummyList() 1590 self.finished = namespace.DummyList() 1591 self._can_exit = namespace.Event() 1592 if not wait_before_exit: 1593 self._can_exit.set() 1594 1595 threads = [] 1596 for i in range(n): 1597 p = namespace.Process(target=self.task) 1598 p.daemon = True 1599 p.start() 1600 threads.append(p) 1601 1602 def finalize(threads): 1603 for p in threads: 1604 p.join() 1605 1606 self._finalizer = weakref.finalize(self, finalize, threads) 1607 1608 def task(self): 1609 pid = os.getpid() 1610 self.started.append(pid) 1611 try: 1612 self.f(*self.args) 1613 finally: 1614 self.finished.append(pid) 1615 self._can_exit.wait(30) 1616 assert self._can_exit.is_set() 1617 1618 def wait_for_started(self): 1619 while len(self.started) < self.n: 1620 _wait() 1621 1622 def wait_for_finished(self): 1623 while len(self.finished) < self.n: 1624 _wait() 1625 1626 def do_finish(self): 1627 self._can_exit.set() 1628 1629 def close(self): 1630 self._finalizer() 1631 1632 1633class AppendTrue(object): 1634 def __init__(self, obj): 1635 self.obj = obj 1636 def __call__(self): 1637 self.obj.append(True) 1638 1639 1640class _TestBarrier(BaseTestCase): 1641 """ 1642 Tests for Barrier objects. 1643 """ 1644 N = 5 1645 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1646 1647 def setUp(self): 1648 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1649 1650 def tearDown(self): 1651 self.barrier.abort() 1652 self.barrier = None 1653 1654 def DummyList(self): 1655 if self.TYPE == 'threads': 1656 return [] 1657 elif self.TYPE == 'manager': 1658 return self.manager.list() 1659 else: 1660 return _DummyList() 1661 1662 def run_threads(self, f, args): 1663 b = Bunch(self, f, args, self.N-1) 1664 try: 1665 f(*args) 1666 b.wait_for_finished() 1667 finally: 1668 b.close() 1669 1670 @classmethod 1671 def multipass(cls, barrier, results, n): 1672 m = barrier.parties 1673 assert m == cls.N 1674 for i in range(n): 1675 results[0].append(True) 1676 assert len(results[1]) == i * m 1677 barrier.wait() 1678 results[1].append(True) 1679 assert len(results[0]) == (i + 1) * m 1680 barrier.wait() 1681 try: 1682 assert barrier.n_waiting == 0 1683 except NotImplementedError: 1684 pass 1685 assert not barrier.broken 1686 1687 def test_barrier(self, passes=1): 1688 """ 1689 Test that a barrier is passed in lockstep 1690 """ 1691 results = [self.DummyList(), self.DummyList()] 1692 self.run_threads(self.multipass, (self.barrier, results, passes)) 1693 1694 def test_barrier_10(self): 1695 """ 1696 Test that a barrier works for 10 consecutive runs 1697 """ 1698 return self.test_barrier(10) 1699 1700 @classmethod 1701 def _test_wait_return_f(cls, barrier, queue): 1702 res = barrier.wait() 1703 queue.put(res) 1704 1705 def test_wait_return(self): 1706 """ 1707 test the return value from barrier.wait 1708 """ 1709 queue = self.Queue() 1710 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1711 results = [queue.get() for i in range(self.N)] 1712 self.assertEqual(results.count(0), 1) 1713 close_queue(queue) 1714 1715 @classmethod 1716 def _test_action_f(cls, barrier, results): 1717 barrier.wait() 1718 if len(results) != 1: 1719 raise RuntimeError 1720 1721 def test_action(self): 1722 """ 1723 Test the 'action' callback 1724 """ 1725 results = self.DummyList() 1726 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1727 self.run_threads(self._test_action_f, (barrier, results)) 1728 self.assertEqual(len(results), 1) 1729 1730 @classmethod 1731 def _test_abort_f(cls, barrier, results1, results2): 1732 try: 1733 i = barrier.wait() 1734 if i == cls.N//2: 1735 raise RuntimeError 1736 barrier.wait() 1737 results1.append(True) 1738 except threading.BrokenBarrierError: 1739 results2.append(True) 1740 except RuntimeError: 1741 barrier.abort() 1742 1743 def test_abort(self): 1744 """ 1745 Test that an abort will put the barrier in a broken state 1746 """ 1747 results1 = self.DummyList() 1748 results2 = self.DummyList() 1749 self.run_threads(self._test_abort_f, 1750 (self.barrier, results1, results2)) 1751 self.assertEqual(len(results1), 0) 1752 self.assertEqual(len(results2), self.N-1) 1753 self.assertTrue(self.barrier.broken) 1754 1755 @classmethod 1756 def _test_reset_f(cls, barrier, results1, results2, results3): 1757 i = barrier.wait() 1758 if i == cls.N//2: 1759 # Wait until the other threads are all in the barrier. 1760 while barrier.n_waiting < cls.N-1: 1761 time.sleep(0.001) 1762 barrier.reset() 1763 else: 1764 try: 1765 barrier.wait() 1766 results1.append(True) 1767 except threading.BrokenBarrierError: 1768 results2.append(True) 1769 # Now, pass the barrier again 1770 barrier.wait() 1771 results3.append(True) 1772 1773 def test_reset(self): 1774 """ 1775 Test that a 'reset' on a barrier frees the waiting threads 1776 """ 1777 results1 = self.DummyList() 1778 results2 = self.DummyList() 1779 results3 = self.DummyList() 1780 self.run_threads(self._test_reset_f, 1781 (self.barrier, results1, results2, results3)) 1782 self.assertEqual(len(results1), 0) 1783 self.assertEqual(len(results2), self.N-1) 1784 self.assertEqual(len(results3), self.N) 1785 1786 @classmethod 1787 def _test_abort_and_reset_f(cls, barrier, barrier2, 1788 results1, results2, results3): 1789 try: 1790 i = barrier.wait() 1791 if i == cls.N//2: 1792 raise RuntimeError 1793 barrier.wait() 1794 results1.append(True) 1795 except threading.BrokenBarrierError: 1796 results2.append(True) 1797 except RuntimeError: 1798 barrier.abort() 1799 # Synchronize and reset the barrier. Must synchronize first so 1800 # that everyone has left it when we reset, and after so that no 1801 # one enters it before the reset. 1802 if barrier2.wait() == cls.N//2: 1803 barrier.reset() 1804 barrier2.wait() 1805 barrier.wait() 1806 results3.append(True) 1807 1808 def test_abort_and_reset(self): 1809 """ 1810 Test that a barrier can be reset after being broken. 1811 """ 1812 results1 = self.DummyList() 1813 results2 = self.DummyList() 1814 results3 = self.DummyList() 1815 barrier2 = self.Barrier(self.N) 1816 1817 self.run_threads(self._test_abort_and_reset_f, 1818 (self.barrier, barrier2, results1, results2, results3)) 1819 self.assertEqual(len(results1), 0) 1820 self.assertEqual(len(results2), self.N-1) 1821 self.assertEqual(len(results3), self.N) 1822 1823 @classmethod 1824 def _test_timeout_f(cls, barrier, results): 1825 i = barrier.wait() 1826 if i == cls.N//2: 1827 # One thread is late! 1828 time.sleep(1.0) 1829 try: 1830 barrier.wait(0.5) 1831 except threading.BrokenBarrierError: 1832 results.append(True) 1833 1834 def test_timeout(self): 1835 """ 1836 Test wait(timeout) 1837 """ 1838 results = self.DummyList() 1839 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1840 self.assertEqual(len(results), self.barrier.parties) 1841 1842 @classmethod 1843 def _test_default_timeout_f(cls, barrier, results): 1844 i = barrier.wait(cls.defaultTimeout) 1845 if i == cls.N//2: 1846 # One thread is later than the default timeout 1847 time.sleep(1.0) 1848 try: 1849 barrier.wait() 1850 except threading.BrokenBarrierError: 1851 results.append(True) 1852 1853 def test_default_timeout(self): 1854 """ 1855 Test the barrier's default timeout 1856 """ 1857 barrier = self.Barrier(self.N, timeout=0.5) 1858 results = self.DummyList() 1859 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1860 self.assertEqual(len(results), barrier.parties) 1861 1862 def test_single_thread(self): 1863 b = self.Barrier(1) 1864 b.wait() 1865 b.wait() 1866 1867 @classmethod 1868 def _test_thousand_f(cls, barrier, passes, conn, lock): 1869 for i in range(passes): 1870 barrier.wait() 1871 with lock: 1872 conn.send(i) 1873 1874 def test_thousand(self): 1875 if self.TYPE == 'manager': 1876 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1877 passes = 1000 1878 lock = self.Lock() 1879 conn, child_conn = self.Pipe(False) 1880 for j in range(self.N): 1881 p = self.Process(target=self._test_thousand_f, 1882 args=(self.barrier, passes, child_conn, lock)) 1883 p.start() 1884 self.addCleanup(p.join) 1885 1886 for i in range(passes): 1887 for j in range(self.N): 1888 self.assertEqual(conn.recv(), i) 1889 1890# 1891# 1892# 1893 1894class _TestValue(BaseTestCase): 1895 1896 ALLOWED_TYPES = ('processes',) 1897 1898 codes_values = [ 1899 ('i', 4343, 24234), 1900 ('d', 3.625, -4.25), 1901 ('h', -232, 234), 1902 ('q', 2 ** 33, 2 ** 34), 1903 ('c', latin('x'), latin('y')) 1904 ] 1905 1906 def setUp(self): 1907 if not HAS_SHAREDCTYPES: 1908 self.skipTest("requires multiprocessing.sharedctypes") 1909 1910 @classmethod 1911 def _test(cls, values): 1912 for sv, cv in zip(values, cls.codes_values): 1913 sv.value = cv[2] 1914 1915 1916 def test_value(self, raw=False): 1917 if raw: 1918 values = [self.RawValue(code, value) 1919 for code, value, _ in self.codes_values] 1920 else: 1921 values = [self.Value(code, value) 1922 for code, value, _ in self.codes_values] 1923 1924 for sv, cv in zip(values, self.codes_values): 1925 self.assertEqual(sv.value, cv[1]) 1926 1927 proc = self.Process(target=self._test, args=(values,)) 1928 proc.daemon = True 1929 proc.start() 1930 proc.join() 1931 1932 for sv, cv in zip(values, self.codes_values): 1933 self.assertEqual(sv.value, cv[2]) 1934 1935 def test_rawvalue(self): 1936 self.test_value(raw=True) 1937 1938 def test_getobj_getlock(self): 1939 val1 = self.Value('i', 5) 1940 lock1 = val1.get_lock() 1941 obj1 = val1.get_obj() 1942 1943 val2 = self.Value('i', 5, lock=None) 1944 lock2 = val2.get_lock() 1945 obj2 = val2.get_obj() 1946 1947 lock = self.Lock() 1948 val3 = self.Value('i', 5, lock=lock) 1949 lock3 = val3.get_lock() 1950 obj3 = val3.get_obj() 1951 self.assertEqual(lock, lock3) 1952 1953 arr4 = self.Value('i', 5, lock=False) 1954 self.assertFalse(hasattr(arr4, 'get_lock')) 1955 self.assertFalse(hasattr(arr4, 'get_obj')) 1956 1957 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 1958 1959 arr5 = self.RawValue('i', 5) 1960 self.assertFalse(hasattr(arr5, 'get_lock')) 1961 self.assertFalse(hasattr(arr5, 'get_obj')) 1962 1963 1964class _TestArray(BaseTestCase): 1965 1966 ALLOWED_TYPES = ('processes',) 1967 1968 @classmethod 1969 def f(cls, seq): 1970 for i in range(1, len(seq)): 1971 seq[i] += seq[i-1] 1972 1973 @unittest.skipIf(c_int is None, "requires _ctypes") 1974 def test_array(self, raw=False): 1975 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 1976 if raw: 1977 arr = self.RawArray('i', seq) 1978 else: 1979 arr = self.Array('i', seq) 1980 1981 self.assertEqual(len(arr), len(seq)) 1982 self.assertEqual(arr[3], seq[3]) 1983 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 1984 1985 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 1986 1987 self.assertEqual(list(arr[:]), seq) 1988 1989 self.f(seq) 1990 1991 p = self.Process(target=self.f, args=(arr,)) 1992 p.daemon = True 1993 p.start() 1994 p.join() 1995 1996 self.assertEqual(list(arr[:]), seq) 1997 1998 @unittest.skipIf(c_int is None, "requires _ctypes") 1999 def test_array_from_size(self): 2000 size = 10 2001 # Test for zeroing (see issue #11675). 2002 # The repetition below strengthens the test by increasing the chances 2003 # of previously allocated non-zero memory being used for the new array 2004 # on the 2nd and 3rd loops. 2005 for _ in range(3): 2006 arr = self.Array('i', size) 2007 self.assertEqual(len(arr), size) 2008 self.assertEqual(list(arr), [0] * size) 2009 arr[:] = range(10) 2010 self.assertEqual(list(arr), list(range(10))) 2011 del arr 2012 2013 @unittest.skipIf(c_int is None, "requires _ctypes") 2014 def test_rawarray(self): 2015 self.test_array(raw=True) 2016 2017 @unittest.skipIf(c_int is None, "requires _ctypes") 2018 def test_getobj_getlock_obj(self): 2019 arr1 = self.Array('i', list(range(10))) 2020 lock1 = arr1.get_lock() 2021 obj1 = arr1.get_obj() 2022 2023 arr2 = self.Array('i', list(range(10)), lock=None) 2024 lock2 = arr2.get_lock() 2025 obj2 = arr2.get_obj() 2026 2027 lock = self.Lock() 2028 arr3 = self.Array('i', list(range(10)), lock=lock) 2029 lock3 = arr3.get_lock() 2030 obj3 = arr3.get_obj() 2031 self.assertEqual(lock, lock3) 2032 2033 arr4 = self.Array('i', range(10), lock=False) 2034 self.assertFalse(hasattr(arr4, 'get_lock')) 2035 self.assertFalse(hasattr(arr4, 'get_obj')) 2036 self.assertRaises(AttributeError, 2037 self.Array, 'i', range(10), lock='notalock') 2038 2039 arr5 = self.RawArray('i', range(10)) 2040 self.assertFalse(hasattr(arr5, 'get_lock')) 2041 self.assertFalse(hasattr(arr5, 'get_obj')) 2042 2043# 2044# 2045# 2046 2047class _TestContainers(BaseTestCase): 2048 2049 ALLOWED_TYPES = ('manager',) 2050 2051 def test_list(self): 2052 a = self.list(list(range(10))) 2053 self.assertEqual(a[:], list(range(10))) 2054 2055 b = self.list() 2056 self.assertEqual(b[:], []) 2057 2058 b.extend(list(range(5))) 2059 self.assertEqual(b[:], list(range(5))) 2060 2061 self.assertEqual(b[2], 2) 2062 self.assertEqual(b[2:10], [2,3,4]) 2063 2064 b *= 2 2065 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2066 2067 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2068 2069 self.assertEqual(a[:], list(range(10))) 2070 2071 d = [a, b] 2072 e = self.list(d) 2073 self.assertEqual( 2074 [element[:] for element in e], 2075 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2076 ) 2077 2078 f = self.list([a]) 2079 a.append('hello') 2080 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2081 2082 def test_list_iter(self): 2083 a = self.list(list(range(10))) 2084 it = iter(a) 2085 self.assertEqual(list(it), list(range(10))) 2086 self.assertEqual(list(it), []) # exhausted 2087 # list modified during iteration 2088 it = iter(a) 2089 a[0] = 100 2090 self.assertEqual(next(it), 100) 2091 2092 def test_list_proxy_in_list(self): 2093 a = self.list([self.list(range(3)) for _i in range(3)]) 2094 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2095 2096 a[0][-1] = 55 2097 self.assertEqual(a[0][:], [0, 1, 55]) 2098 for i in range(1, 3): 2099 self.assertEqual(a[i][:], [0, 1, 2]) 2100 2101 self.assertEqual(a[1].pop(), 2) 2102 self.assertEqual(len(a[1]), 2) 2103 for i in range(0, 3, 2): 2104 self.assertEqual(len(a[i]), 3) 2105 2106 del a 2107 2108 b = self.list() 2109 b.append(b) 2110 del b 2111 2112 def test_dict(self): 2113 d = self.dict() 2114 indices = list(range(65, 70)) 2115 for i in indices: 2116 d[i] = chr(i) 2117 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2118 self.assertEqual(sorted(d.keys()), indices) 2119 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2120 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2121 2122 def test_dict_iter(self): 2123 d = self.dict() 2124 indices = list(range(65, 70)) 2125 for i in indices: 2126 d[i] = chr(i) 2127 it = iter(d) 2128 self.assertEqual(list(it), indices) 2129 self.assertEqual(list(it), []) # exhausted 2130 # dictionary changed size during iteration 2131 it = iter(d) 2132 d.clear() 2133 self.assertRaises(RuntimeError, next, it) 2134 2135 def test_dict_proxy_nested(self): 2136 pets = self.dict(ferrets=2, hamsters=4) 2137 supplies = self.dict(water=10, feed=3) 2138 d = self.dict(pets=pets, supplies=supplies) 2139 2140 self.assertEqual(supplies['water'], 10) 2141 self.assertEqual(d['supplies']['water'], 10) 2142 2143 d['supplies']['blankets'] = 5 2144 self.assertEqual(supplies['blankets'], 5) 2145 self.assertEqual(d['supplies']['blankets'], 5) 2146 2147 d['supplies']['water'] = 7 2148 self.assertEqual(supplies['water'], 7) 2149 self.assertEqual(d['supplies']['water'], 7) 2150 2151 del pets 2152 del supplies 2153 self.assertEqual(d['pets']['ferrets'], 2) 2154 d['supplies']['blankets'] = 11 2155 self.assertEqual(d['supplies']['blankets'], 11) 2156 2157 pets = d['pets'] 2158 supplies = d['supplies'] 2159 supplies['water'] = 7 2160 self.assertEqual(supplies['water'], 7) 2161 self.assertEqual(d['supplies']['water'], 7) 2162 2163 d.clear() 2164 self.assertEqual(len(d), 0) 2165 self.assertEqual(supplies['water'], 7) 2166 self.assertEqual(pets['hamsters'], 4) 2167 2168 l = self.list([pets, supplies]) 2169 l[0]['marmots'] = 1 2170 self.assertEqual(pets['marmots'], 1) 2171 self.assertEqual(l[0]['marmots'], 1) 2172 2173 del pets 2174 del supplies 2175 self.assertEqual(l[0]['marmots'], 1) 2176 2177 outer = self.list([[88, 99], l]) 2178 self.assertIsInstance(outer[0], list) # Not a ListProxy 2179 self.assertEqual(outer[-1][-1]['feed'], 3) 2180 2181 def test_namespace(self): 2182 n = self.Namespace() 2183 n.name = 'Bob' 2184 n.job = 'Builder' 2185 n._hidden = 'hidden' 2186 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2187 del n.job 2188 self.assertEqual(str(n), "Namespace(name='Bob')") 2189 self.assertTrue(hasattr(n, 'name')) 2190 self.assertTrue(not hasattr(n, 'job')) 2191 2192# 2193# 2194# 2195 2196def sqr(x, wait=0.0): 2197 time.sleep(wait) 2198 return x*x 2199 2200def mul(x, y): 2201 return x*y 2202 2203def raise_large_valuerror(wait): 2204 time.sleep(wait) 2205 raise ValueError("x" * 1024**2) 2206 2207def identity(x): 2208 return x 2209 2210class CountedObject(object): 2211 n_instances = 0 2212 2213 def __new__(cls): 2214 cls.n_instances += 1 2215 return object.__new__(cls) 2216 2217 def __del__(self): 2218 type(self).n_instances -= 1 2219 2220class SayWhenError(ValueError): pass 2221 2222def exception_throwing_generator(total, when): 2223 if when == -1: 2224 raise SayWhenError("Somebody said when") 2225 for i in range(total): 2226 if i == when: 2227 raise SayWhenError("Somebody said when") 2228 yield i 2229 2230 2231class _TestPool(BaseTestCase): 2232 2233 @classmethod 2234 def setUpClass(cls): 2235 super().setUpClass() 2236 cls.pool = cls.Pool(4) 2237 2238 @classmethod 2239 def tearDownClass(cls): 2240 cls.pool.terminate() 2241 cls.pool.join() 2242 cls.pool = None 2243 super().tearDownClass() 2244 2245 def test_apply(self): 2246 papply = self.pool.apply 2247 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2248 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2249 2250 def test_map(self): 2251 pmap = self.pool.map 2252 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2253 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2254 list(map(sqr, list(range(100))))) 2255 2256 def test_starmap(self): 2257 psmap = self.pool.starmap 2258 tuples = list(zip(range(10), range(9,-1, -1))) 2259 self.assertEqual(psmap(mul, tuples), 2260 list(itertools.starmap(mul, tuples))) 2261 tuples = list(zip(range(100), range(99,-1, -1))) 2262 self.assertEqual(psmap(mul, tuples, chunksize=20), 2263 list(itertools.starmap(mul, tuples))) 2264 2265 def test_starmap_async(self): 2266 tuples = list(zip(range(100), range(99,-1, -1))) 2267 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2268 list(itertools.starmap(mul, tuples))) 2269 2270 def test_map_async(self): 2271 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2272 list(map(sqr, list(range(10))))) 2273 2274 def test_map_async_callbacks(self): 2275 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2276 self.pool.map_async(int, ['1'], 2277 callback=call_args.append, 2278 error_callback=call_args.append).wait() 2279 self.assertEqual(1, len(call_args)) 2280 self.assertEqual([1], call_args[0]) 2281 self.pool.map_async(int, ['a'], 2282 callback=call_args.append, 2283 error_callback=call_args.append).wait() 2284 self.assertEqual(2, len(call_args)) 2285 self.assertIsInstance(call_args[1], ValueError) 2286 2287 def test_map_unplicklable(self): 2288 # Issue #19425 -- failure to pickle should not cause a hang 2289 if self.TYPE == 'threads': 2290 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2291 class A(object): 2292 def __reduce__(self): 2293 raise RuntimeError('cannot pickle') 2294 with self.assertRaises(RuntimeError): 2295 self.pool.map(sqr, [A()]*10) 2296 2297 def test_map_chunksize(self): 2298 try: 2299 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2300 except multiprocessing.TimeoutError: 2301 self.fail("pool.map_async with chunksize stalled on null list") 2302 2303 def test_map_handle_iterable_exception(self): 2304 if self.TYPE == 'manager': 2305 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2306 2307 # SayWhenError seen at the very first of the iterable 2308 with self.assertRaises(SayWhenError): 2309 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2310 # again, make sure it's reentrant 2311 with self.assertRaises(SayWhenError): 2312 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2313 2314 with self.assertRaises(SayWhenError): 2315 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2316 2317 class SpecialIterable: 2318 def __iter__(self): 2319 return self 2320 def __next__(self): 2321 raise SayWhenError 2322 def __len__(self): 2323 return 1 2324 with self.assertRaises(SayWhenError): 2325 self.pool.map(sqr, SpecialIterable(), 1) 2326 with self.assertRaises(SayWhenError): 2327 self.pool.map(sqr, SpecialIterable(), 1) 2328 2329 def test_async(self): 2330 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2331 get = TimingWrapper(res.get) 2332 self.assertEqual(get(), 49) 2333 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2334 2335 def test_async_timeout(self): 2336 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2337 get = TimingWrapper(res.get) 2338 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2339 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2340 2341 def test_imap(self): 2342 it = self.pool.imap(sqr, list(range(10))) 2343 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2344 2345 it = self.pool.imap(sqr, list(range(10))) 2346 for i in range(10): 2347 self.assertEqual(next(it), i*i) 2348 self.assertRaises(StopIteration, it.__next__) 2349 2350 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2351 for i in range(1000): 2352 self.assertEqual(next(it), i*i) 2353 self.assertRaises(StopIteration, it.__next__) 2354 2355 def test_imap_handle_iterable_exception(self): 2356 if self.TYPE == 'manager': 2357 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2358 2359 # SayWhenError seen at the very first of the iterable 2360 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2361 self.assertRaises(SayWhenError, it.__next__) 2362 # again, make sure it's reentrant 2363 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2364 self.assertRaises(SayWhenError, it.__next__) 2365 2366 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2367 for i in range(3): 2368 self.assertEqual(next(it), i*i) 2369 self.assertRaises(SayWhenError, it.__next__) 2370 2371 # SayWhenError seen at start of problematic chunk's results 2372 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2373 for i in range(6): 2374 self.assertEqual(next(it), i*i) 2375 self.assertRaises(SayWhenError, it.__next__) 2376 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2377 for i in range(4): 2378 self.assertEqual(next(it), i*i) 2379 self.assertRaises(SayWhenError, it.__next__) 2380 2381 def test_imap_unordered(self): 2382 it = self.pool.imap_unordered(sqr, list(range(10))) 2383 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2384 2385 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2386 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2387 2388 def test_imap_unordered_handle_iterable_exception(self): 2389 if self.TYPE == 'manager': 2390 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2391 2392 # SayWhenError seen at the very first of the iterable 2393 it = self.pool.imap_unordered(sqr, 2394 exception_throwing_generator(1, -1), 2395 1) 2396 self.assertRaises(SayWhenError, it.__next__) 2397 # again, make sure it's reentrant 2398 it = self.pool.imap_unordered(sqr, 2399 exception_throwing_generator(1, -1), 2400 1) 2401 self.assertRaises(SayWhenError, it.__next__) 2402 2403 it = self.pool.imap_unordered(sqr, 2404 exception_throwing_generator(10, 3), 2405 1) 2406 expected_values = list(map(sqr, list(range(10)))) 2407 with self.assertRaises(SayWhenError): 2408 # imap_unordered makes it difficult to anticipate the SayWhenError 2409 for i in range(10): 2410 value = next(it) 2411 self.assertIn(value, expected_values) 2412 expected_values.remove(value) 2413 2414 it = self.pool.imap_unordered(sqr, 2415 exception_throwing_generator(20, 7), 2416 2) 2417 expected_values = list(map(sqr, list(range(20)))) 2418 with self.assertRaises(SayWhenError): 2419 for i in range(20): 2420 value = next(it) 2421 self.assertIn(value, expected_values) 2422 expected_values.remove(value) 2423 2424 def test_make_pool(self): 2425 expected_error = (RemoteError if self.TYPE == 'manager' 2426 else ValueError) 2427 2428 self.assertRaises(expected_error, self.Pool, -1) 2429 self.assertRaises(expected_error, self.Pool, 0) 2430 2431 if self.TYPE != 'manager': 2432 p = self.Pool(3) 2433 try: 2434 self.assertEqual(3, len(p._pool)) 2435 finally: 2436 p.close() 2437 p.join() 2438 2439 def test_terminate(self): 2440 result = self.pool.map_async( 2441 time.sleep, [0.1 for i in range(10000)], chunksize=1 2442 ) 2443 self.pool.terminate() 2444 join = TimingWrapper(self.pool.join) 2445 join() 2446 # Sanity check the pool didn't wait for all tasks to finish 2447 self.assertLess(join.elapsed, 2.0) 2448 2449 def test_empty_iterable(self): 2450 # See Issue 12157 2451 p = self.Pool(1) 2452 2453 self.assertEqual(p.map(sqr, []), []) 2454 self.assertEqual(list(p.imap(sqr, [])), []) 2455 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2456 self.assertEqual(p.map_async(sqr, []).get(), []) 2457 2458 p.close() 2459 p.join() 2460 2461 def test_context(self): 2462 if self.TYPE == 'processes': 2463 L = list(range(10)) 2464 expected = [sqr(i) for i in L] 2465 with self.Pool(2) as p: 2466 r = p.map_async(sqr, L) 2467 self.assertEqual(r.get(), expected) 2468 p.join() 2469 self.assertRaises(ValueError, p.map_async, sqr, L) 2470 2471 @classmethod 2472 def _test_traceback(cls): 2473 raise RuntimeError(123) # some comment 2474 2475 def test_traceback(self): 2476 # We want ensure that the traceback from the child process is 2477 # contained in the traceback raised in the main process. 2478 if self.TYPE == 'processes': 2479 with self.Pool(1) as p: 2480 try: 2481 p.apply(self._test_traceback) 2482 except Exception as e: 2483 exc = e 2484 else: 2485 self.fail('expected RuntimeError') 2486 p.join() 2487 self.assertIs(type(exc), RuntimeError) 2488 self.assertEqual(exc.args, (123,)) 2489 cause = exc.__cause__ 2490 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2491 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2492 2493 with test.support.captured_stderr() as f1: 2494 try: 2495 raise exc 2496 except RuntimeError: 2497 sys.excepthook(*sys.exc_info()) 2498 self.assertIn('raise RuntimeError(123) # some comment', 2499 f1.getvalue()) 2500 # _helper_reraises_exception should not make the error 2501 # a remote exception 2502 with self.Pool(1) as p: 2503 try: 2504 p.map(sqr, exception_throwing_generator(1, -1), 1) 2505 except Exception as e: 2506 exc = e 2507 else: 2508 self.fail('expected SayWhenError') 2509 self.assertIs(type(exc), SayWhenError) 2510 self.assertIs(exc.__cause__, None) 2511 p.join() 2512 2513 @classmethod 2514 def _test_wrapped_exception(cls): 2515 raise RuntimeError('foo') 2516 2517 def test_wrapped_exception(self): 2518 # Issue #20980: Should not wrap exception when using thread pool 2519 with self.Pool(1) as p: 2520 with self.assertRaises(RuntimeError): 2521 p.apply(self._test_wrapped_exception) 2522 p.join() 2523 2524 def test_map_no_failfast(self): 2525 # Issue #23992: the fail-fast behaviour when an exception is raised 2526 # during map() would make Pool.join() deadlock, because a worker 2527 # process would fill the result queue (after the result handler thread 2528 # terminated, hence not draining it anymore). 2529 2530 t_start = time.monotonic() 2531 2532 with self.assertRaises(ValueError): 2533 with self.Pool(2) as p: 2534 try: 2535 p.map(raise_large_valuerror, [0, 1]) 2536 finally: 2537 time.sleep(0.5) 2538 p.close() 2539 p.join() 2540 2541 # check that we indeed waited for all jobs 2542 self.assertGreater(time.monotonic() - t_start, 0.9) 2543 2544 def test_release_task_refs(self): 2545 # Issue #29861: task arguments and results should not be kept 2546 # alive after we are done with them. 2547 objs = [CountedObject() for i in range(10)] 2548 refs = [weakref.ref(o) for o in objs] 2549 self.pool.map(identity, objs) 2550 2551 del objs 2552 time.sleep(DELTA) # let threaded cleanup code run 2553 self.assertEqual(set(wr() for wr in refs), {None}) 2554 # With a process pool, copies of the objects are returned, check 2555 # they were released too. 2556 self.assertEqual(CountedObject.n_instances, 0) 2557 2558 2559def raising(): 2560 raise KeyError("key") 2561 2562def unpickleable_result(): 2563 return lambda: 42 2564 2565class _TestPoolWorkerErrors(BaseTestCase): 2566 ALLOWED_TYPES = ('processes', ) 2567 2568 def test_async_error_callback(self): 2569 p = multiprocessing.Pool(2) 2570 2571 scratchpad = [None] 2572 def errback(exc): 2573 scratchpad[0] = exc 2574 2575 res = p.apply_async(raising, error_callback=errback) 2576 self.assertRaises(KeyError, res.get) 2577 self.assertTrue(scratchpad[0]) 2578 self.assertIsInstance(scratchpad[0], KeyError) 2579 2580 p.close() 2581 p.join() 2582 2583 def test_unpickleable_result(self): 2584 from multiprocessing.pool import MaybeEncodingError 2585 p = multiprocessing.Pool(2) 2586 2587 # Make sure we don't lose pool processes because of encoding errors. 2588 for iteration in range(20): 2589 2590 scratchpad = [None] 2591 def errback(exc): 2592 scratchpad[0] = exc 2593 2594 res = p.apply_async(unpickleable_result, error_callback=errback) 2595 self.assertRaises(MaybeEncodingError, res.get) 2596 wrapped = scratchpad[0] 2597 self.assertTrue(wrapped) 2598 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2599 self.assertIsNotNone(wrapped.exc) 2600 self.assertIsNotNone(wrapped.value) 2601 2602 p.close() 2603 p.join() 2604 2605class _TestPoolWorkerLifetime(BaseTestCase): 2606 ALLOWED_TYPES = ('processes', ) 2607 2608 def test_pool_worker_lifetime(self): 2609 p = multiprocessing.Pool(3, maxtasksperchild=10) 2610 self.assertEqual(3, len(p._pool)) 2611 origworkerpids = [w.pid for w in p._pool] 2612 # Run many tasks so each worker gets replaced (hopefully) 2613 results = [] 2614 for i in range(100): 2615 results.append(p.apply_async(sqr, (i, ))) 2616 # Fetch the results and verify we got the right answers, 2617 # also ensuring all the tasks have completed. 2618 for (j, res) in enumerate(results): 2619 self.assertEqual(res.get(), sqr(j)) 2620 # Refill the pool 2621 p._repopulate_pool() 2622 # Wait until all workers are alive 2623 # (countdown * DELTA = 5 seconds max startup process time) 2624 countdown = 50 2625 while countdown and not all(w.is_alive() for w in p._pool): 2626 countdown -= 1 2627 time.sleep(DELTA) 2628 finalworkerpids = [w.pid for w in p._pool] 2629 # All pids should be assigned. See issue #7805. 2630 self.assertNotIn(None, origworkerpids) 2631 self.assertNotIn(None, finalworkerpids) 2632 # Finally, check that the worker pids have changed 2633 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2634 p.close() 2635 p.join() 2636 2637 def test_pool_worker_lifetime_early_close(self): 2638 # Issue #10332: closing a pool whose workers have limited lifetimes 2639 # before all the tasks completed would make join() hang. 2640 p = multiprocessing.Pool(3, maxtasksperchild=1) 2641 results = [] 2642 for i in range(6): 2643 results.append(p.apply_async(sqr, (i, 0.3))) 2644 p.close() 2645 p.join() 2646 # check the results 2647 for (j, res) in enumerate(results): 2648 self.assertEqual(res.get(), sqr(j)) 2649 2650# 2651# Test of creating a customized manager class 2652# 2653 2654from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2655 2656class FooBar(object): 2657 def f(self): 2658 return 'f()' 2659 def g(self): 2660 raise ValueError 2661 def _h(self): 2662 return '_h()' 2663 2664def baz(): 2665 for i in range(10): 2666 yield i*i 2667 2668class IteratorProxy(BaseProxy): 2669 _exposed_ = ('__next__',) 2670 def __iter__(self): 2671 return self 2672 def __next__(self): 2673 return self._callmethod('__next__') 2674 2675class MyManager(BaseManager): 2676 pass 2677 2678MyManager.register('Foo', callable=FooBar) 2679MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2680MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2681 2682 2683class _TestMyManager(BaseTestCase): 2684 2685 ALLOWED_TYPES = ('manager',) 2686 2687 def test_mymanager(self): 2688 manager = MyManager() 2689 manager.start() 2690 self.common(manager) 2691 manager.shutdown() 2692 2693 # If the manager process exited cleanly then the exitcode 2694 # will be zero. Otherwise (after a short timeout) 2695 # terminate() is used, resulting in an exitcode of -SIGTERM. 2696 self.assertEqual(manager._process.exitcode, 0) 2697 2698 def test_mymanager_context(self): 2699 with MyManager() as manager: 2700 self.common(manager) 2701 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2702 # to the manager process if it takes longer than 1 second to stop. 2703 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2704 2705 def test_mymanager_context_prestarted(self): 2706 manager = MyManager() 2707 manager.start() 2708 with manager: 2709 self.common(manager) 2710 self.assertEqual(manager._process.exitcode, 0) 2711 2712 def common(self, manager): 2713 foo = manager.Foo() 2714 bar = manager.Bar() 2715 baz = manager.baz() 2716 2717 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2718 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2719 2720 self.assertEqual(foo_methods, ['f', 'g']) 2721 self.assertEqual(bar_methods, ['f', '_h']) 2722 2723 self.assertEqual(foo.f(), 'f()') 2724 self.assertRaises(ValueError, foo.g) 2725 self.assertEqual(foo._callmethod('f'), 'f()') 2726 self.assertRaises(RemoteError, foo._callmethod, '_h') 2727 2728 self.assertEqual(bar.f(), 'f()') 2729 self.assertEqual(bar._h(), '_h()') 2730 self.assertEqual(bar._callmethod('f'), 'f()') 2731 self.assertEqual(bar._callmethod('_h'), '_h()') 2732 2733 self.assertEqual(list(baz), [i*i for i in range(10)]) 2734 2735 2736# 2737# Test of connecting to a remote server and using xmlrpclib for serialization 2738# 2739 2740_queue = pyqueue.Queue() 2741def get_queue(): 2742 return _queue 2743 2744class QueueManager(BaseManager): 2745 '''manager class used by server process''' 2746QueueManager.register('get_queue', callable=get_queue) 2747 2748class QueueManager2(BaseManager): 2749 '''manager class which specifies the same interface as QueueManager''' 2750QueueManager2.register('get_queue') 2751 2752 2753SERIALIZER = 'xmlrpclib' 2754 2755class _TestRemoteManager(BaseTestCase): 2756 2757 ALLOWED_TYPES = ('manager',) 2758 values = ['hello world', None, True, 2.25, 2759 'hall\xe5 v\xe4rlden', 2760 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2761 b'hall\xe5 v\xe4rlden', 2762 ] 2763 result = values[:] 2764 2765 @classmethod 2766 def _putter(cls, address, authkey): 2767 manager = QueueManager2( 2768 address=address, authkey=authkey, serializer=SERIALIZER 2769 ) 2770 manager.connect() 2771 queue = manager.get_queue() 2772 # Note that xmlrpclib will deserialize object as a list not a tuple 2773 queue.put(tuple(cls.values)) 2774 2775 def test_remote(self): 2776 authkey = os.urandom(32) 2777 2778 manager = QueueManager( 2779 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 2780 ) 2781 manager.start() 2782 self.addCleanup(manager.shutdown) 2783 2784 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2785 p.daemon = True 2786 p.start() 2787 2788 manager2 = QueueManager2( 2789 address=manager.address, authkey=authkey, serializer=SERIALIZER 2790 ) 2791 manager2.connect() 2792 queue = manager2.get_queue() 2793 2794 self.assertEqual(queue.get(), self.result) 2795 2796 # Because we are using xmlrpclib for serialization instead of 2797 # pickle this will cause a serialization error. 2798 self.assertRaises(Exception, queue.put, time.sleep) 2799 2800 # Make queue finalizer run before the server is stopped 2801 del queue 2802 2803class _TestManagerRestart(BaseTestCase): 2804 2805 @classmethod 2806 def _putter(cls, address, authkey): 2807 manager = QueueManager( 2808 address=address, authkey=authkey, serializer=SERIALIZER) 2809 manager.connect() 2810 queue = manager.get_queue() 2811 queue.put('hello world') 2812 2813 def test_rapid_restart(self): 2814 authkey = os.urandom(32) 2815 manager = QueueManager( 2816 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2817 try: 2818 srvr = manager.get_server() 2819 addr = srvr.address 2820 # Close the connection.Listener socket which gets opened as a part 2821 # of manager.get_server(). It's not needed for the test. 2822 srvr.listener.close() 2823 manager.start() 2824 2825 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2826 p.start() 2827 p.join() 2828 queue = manager.get_queue() 2829 self.assertEqual(queue.get(), 'hello world') 2830 del queue 2831 finally: 2832 if hasattr(manager, "shutdown"): 2833 manager.shutdown() 2834 2835 manager = QueueManager( 2836 address=addr, authkey=authkey, serializer=SERIALIZER) 2837 try: 2838 manager.start() 2839 self.addCleanup(manager.shutdown) 2840 except OSError as e: 2841 if e.errno != errno.EADDRINUSE: 2842 raise 2843 # Retry after some time, in case the old socket was lingering 2844 # (sporadic failure on buildbots) 2845 time.sleep(1.0) 2846 manager = QueueManager( 2847 address=addr, authkey=authkey, serializer=SERIALIZER) 2848 if hasattr(manager, "shutdown"): 2849 self.addCleanup(manager.shutdown) 2850 2851# 2852# 2853# 2854 2855SENTINEL = latin('') 2856 2857class _TestConnection(BaseTestCase): 2858 2859 ALLOWED_TYPES = ('processes', 'threads') 2860 2861 @classmethod 2862 def _echo(cls, conn): 2863 for msg in iter(conn.recv_bytes, SENTINEL): 2864 conn.send_bytes(msg) 2865 conn.close() 2866 2867 def test_connection(self): 2868 conn, child_conn = self.Pipe() 2869 2870 p = self.Process(target=self._echo, args=(child_conn,)) 2871 p.daemon = True 2872 p.start() 2873 2874 seq = [1, 2.25, None] 2875 msg = latin('hello world') 2876 longmsg = msg * 10 2877 arr = array.array('i', list(range(4))) 2878 2879 if self.TYPE == 'processes': 2880 self.assertEqual(type(conn.fileno()), int) 2881 2882 self.assertEqual(conn.send(seq), None) 2883 self.assertEqual(conn.recv(), seq) 2884 2885 self.assertEqual(conn.send_bytes(msg), None) 2886 self.assertEqual(conn.recv_bytes(), msg) 2887 2888 if self.TYPE == 'processes': 2889 buffer = array.array('i', [0]*10) 2890 expected = list(arr) + [0] * (10 - len(arr)) 2891 self.assertEqual(conn.send_bytes(arr), None) 2892 self.assertEqual(conn.recv_bytes_into(buffer), 2893 len(arr) * buffer.itemsize) 2894 self.assertEqual(list(buffer), expected) 2895 2896 buffer = array.array('i', [0]*10) 2897 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 2898 self.assertEqual(conn.send_bytes(arr), None) 2899 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 2900 len(arr) * buffer.itemsize) 2901 self.assertEqual(list(buffer), expected) 2902 2903 buffer = bytearray(latin(' ' * 40)) 2904 self.assertEqual(conn.send_bytes(longmsg), None) 2905 try: 2906 res = conn.recv_bytes_into(buffer) 2907 except multiprocessing.BufferTooShort as e: 2908 self.assertEqual(e.args, (longmsg,)) 2909 else: 2910 self.fail('expected BufferTooShort, got %s' % res) 2911 2912 poll = TimingWrapper(conn.poll) 2913 2914 self.assertEqual(poll(), False) 2915 self.assertTimingAlmostEqual(poll.elapsed, 0) 2916 2917 self.assertEqual(poll(-1), False) 2918 self.assertTimingAlmostEqual(poll.elapsed, 0) 2919 2920 self.assertEqual(poll(TIMEOUT1), False) 2921 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 2922 2923 conn.send(None) 2924 time.sleep(.1) 2925 2926 self.assertEqual(poll(TIMEOUT1), True) 2927 self.assertTimingAlmostEqual(poll.elapsed, 0) 2928 2929 self.assertEqual(conn.recv(), None) 2930 2931 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 2932 conn.send_bytes(really_big_msg) 2933 self.assertEqual(conn.recv_bytes(), really_big_msg) 2934 2935 conn.send_bytes(SENTINEL) # tell child to quit 2936 child_conn.close() 2937 2938 if self.TYPE == 'processes': 2939 self.assertEqual(conn.readable, True) 2940 self.assertEqual(conn.writable, True) 2941 self.assertRaises(EOFError, conn.recv) 2942 self.assertRaises(EOFError, conn.recv_bytes) 2943 2944 p.join() 2945 2946 def test_duplex_false(self): 2947 reader, writer = self.Pipe(duplex=False) 2948 self.assertEqual(writer.send(1), None) 2949 self.assertEqual(reader.recv(), 1) 2950 if self.TYPE == 'processes': 2951 self.assertEqual(reader.readable, True) 2952 self.assertEqual(reader.writable, False) 2953 self.assertEqual(writer.readable, False) 2954 self.assertEqual(writer.writable, True) 2955 self.assertRaises(OSError, reader.send, 2) 2956 self.assertRaises(OSError, writer.recv) 2957 self.assertRaises(OSError, writer.poll) 2958 2959 def test_spawn_close(self): 2960 # We test that a pipe connection can be closed by parent 2961 # process immediately after child is spawned. On Windows this 2962 # would have sometimes failed on old versions because 2963 # child_conn would be closed before the child got a chance to 2964 # duplicate it. 2965 conn, child_conn = self.Pipe() 2966 2967 p = self.Process(target=self._echo, args=(child_conn,)) 2968 p.daemon = True 2969 p.start() 2970 child_conn.close() # this might complete before child initializes 2971 2972 msg = latin('hello') 2973 conn.send_bytes(msg) 2974 self.assertEqual(conn.recv_bytes(), msg) 2975 2976 conn.send_bytes(SENTINEL) 2977 conn.close() 2978 p.join() 2979 2980 def test_sendbytes(self): 2981 if self.TYPE != 'processes': 2982 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2983 2984 msg = latin('abcdefghijklmnopqrstuvwxyz') 2985 a, b = self.Pipe() 2986 2987 a.send_bytes(msg) 2988 self.assertEqual(b.recv_bytes(), msg) 2989 2990 a.send_bytes(msg, 5) 2991 self.assertEqual(b.recv_bytes(), msg[5:]) 2992 2993 a.send_bytes(msg, 7, 8) 2994 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 2995 2996 a.send_bytes(msg, 26) 2997 self.assertEqual(b.recv_bytes(), latin('')) 2998 2999 a.send_bytes(msg, 26, 0) 3000 self.assertEqual(b.recv_bytes(), latin('')) 3001 3002 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3003 3004 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3005 3006 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3007 3008 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3009 3010 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3011 3012 @classmethod 3013 def _is_fd_assigned(cls, fd): 3014 try: 3015 os.fstat(fd) 3016 except OSError as e: 3017 if e.errno == errno.EBADF: 3018 return False 3019 raise 3020 else: 3021 return True 3022 3023 @classmethod 3024 def _writefd(cls, conn, data, create_dummy_fds=False): 3025 if create_dummy_fds: 3026 for i in range(0, 256): 3027 if not cls._is_fd_assigned(i): 3028 os.dup2(conn.fileno(), i) 3029 fd = reduction.recv_handle(conn) 3030 if msvcrt: 3031 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3032 os.write(fd, data) 3033 os.close(fd) 3034 3035 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3036 def test_fd_transfer(self): 3037 if self.TYPE != 'processes': 3038 self.skipTest("only makes sense with processes") 3039 conn, child_conn = self.Pipe(duplex=True) 3040 3041 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3042 p.daemon = True 3043 p.start() 3044 self.addCleanup(test.support.unlink, test.support.TESTFN) 3045 with open(test.support.TESTFN, "wb") as f: 3046 fd = f.fileno() 3047 if msvcrt: 3048 fd = msvcrt.get_osfhandle(fd) 3049 reduction.send_handle(conn, fd, p.pid) 3050 p.join() 3051 with open(test.support.TESTFN, "rb") as f: 3052 self.assertEqual(f.read(), b"foo") 3053 3054 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3055 @unittest.skipIf(sys.platform == "win32", 3056 "test semantics don't make sense on Windows") 3057 @unittest.skipIf(MAXFD <= 256, 3058 "largest assignable fd number is too small") 3059 @unittest.skipUnless(hasattr(os, "dup2"), 3060 "test needs os.dup2()") 3061 def test_large_fd_transfer(self): 3062 # With fd > 256 (issue #11657) 3063 if self.TYPE != 'processes': 3064 self.skipTest("only makes sense with processes") 3065 conn, child_conn = self.Pipe(duplex=True) 3066 3067 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3068 p.daemon = True 3069 p.start() 3070 self.addCleanup(test.support.unlink, test.support.TESTFN) 3071 with open(test.support.TESTFN, "wb") as f: 3072 fd = f.fileno() 3073 for newfd in range(256, MAXFD): 3074 if not self._is_fd_assigned(newfd): 3075 break 3076 else: 3077 self.fail("could not find an unassigned large file descriptor") 3078 os.dup2(fd, newfd) 3079 try: 3080 reduction.send_handle(conn, newfd, p.pid) 3081 finally: 3082 os.close(newfd) 3083 p.join() 3084 with open(test.support.TESTFN, "rb") as f: 3085 self.assertEqual(f.read(), b"bar") 3086 3087 @classmethod 3088 def _send_data_without_fd(self, conn): 3089 os.write(conn.fileno(), b"\0") 3090 3091 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3092 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3093 def test_missing_fd_transfer(self): 3094 # Check that exception is raised when received data is not 3095 # accompanied by a file descriptor in ancillary data. 3096 if self.TYPE != 'processes': 3097 self.skipTest("only makes sense with processes") 3098 conn, child_conn = self.Pipe(duplex=True) 3099 3100 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3101 p.daemon = True 3102 p.start() 3103 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3104 p.join() 3105 3106 def test_context(self): 3107 a, b = self.Pipe() 3108 3109 with a, b: 3110 a.send(1729) 3111 self.assertEqual(b.recv(), 1729) 3112 if self.TYPE == 'processes': 3113 self.assertFalse(a.closed) 3114 self.assertFalse(b.closed) 3115 3116 if self.TYPE == 'processes': 3117 self.assertTrue(a.closed) 3118 self.assertTrue(b.closed) 3119 self.assertRaises(OSError, a.recv) 3120 self.assertRaises(OSError, b.recv) 3121 3122class _TestListener(BaseTestCase): 3123 3124 ALLOWED_TYPES = ('processes',) 3125 3126 def test_multiple_bind(self): 3127 for family in self.connection.families: 3128 l = self.connection.Listener(family=family) 3129 self.addCleanup(l.close) 3130 self.assertRaises(OSError, self.connection.Listener, 3131 l.address, family) 3132 3133 def test_context(self): 3134 with self.connection.Listener() as l: 3135 with self.connection.Client(l.address) as c: 3136 with l.accept() as d: 3137 c.send(1729) 3138 self.assertEqual(d.recv(), 1729) 3139 3140 if self.TYPE == 'processes': 3141 self.assertRaises(OSError, l.accept) 3142 3143class _TestListenerClient(BaseTestCase): 3144 3145 ALLOWED_TYPES = ('processes', 'threads') 3146 3147 @classmethod 3148 def _test(cls, address): 3149 conn = cls.connection.Client(address) 3150 conn.send('hello') 3151 conn.close() 3152 3153 def test_listener_client(self): 3154 for family in self.connection.families: 3155 l = self.connection.Listener(family=family) 3156 p = self.Process(target=self._test, args=(l.address,)) 3157 p.daemon = True 3158 p.start() 3159 conn = l.accept() 3160 self.assertEqual(conn.recv(), 'hello') 3161 p.join() 3162 l.close() 3163 3164 def test_issue14725(self): 3165 l = self.connection.Listener() 3166 p = self.Process(target=self._test, args=(l.address,)) 3167 p.daemon = True 3168 p.start() 3169 time.sleep(1) 3170 # On Windows the client process should by now have connected, 3171 # written data and closed the pipe handle by now. This causes 3172 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3173 # 14725. 3174 conn = l.accept() 3175 self.assertEqual(conn.recv(), 'hello') 3176 conn.close() 3177 p.join() 3178 l.close() 3179 3180 def test_issue16955(self): 3181 for fam in self.connection.families: 3182 l = self.connection.Listener(family=fam) 3183 c = self.connection.Client(l.address) 3184 a = l.accept() 3185 a.send_bytes(b"hello") 3186 self.assertTrue(c.poll(1)) 3187 a.close() 3188 c.close() 3189 l.close() 3190 3191class _TestPoll(BaseTestCase): 3192 3193 ALLOWED_TYPES = ('processes', 'threads') 3194 3195 def test_empty_string(self): 3196 a, b = self.Pipe() 3197 self.assertEqual(a.poll(), False) 3198 b.send_bytes(b'') 3199 self.assertEqual(a.poll(), True) 3200 self.assertEqual(a.poll(), True) 3201 3202 @classmethod 3203 def _child_strings(cls, conn, strings): 3204 for s in strings: 3205 time.sleep(0.1) 3206 conn.send_bytes(s) 3207 conn.close() 3208 3209 def test_strings(self): 3210 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3211 a, b = self.Pipe() 3212 p = self.Process(target=self._child_strings, args=(b, strings)) 3213 p.start() 3214 3215 for s in strings: 3216 for i in range(200): 3217 if a.poll(0.01): 3218 break 3219 x = a.recv_bytes() 3220 self.assertEqual(s, x) 3221 3222 p.join() 3223 3224 @classmethod 3225 def _child_boundaries(cls, r): 3226 # Polling may "pull" a message in to the child process, but we 3227 # don't want it to pull only part of a message, as that would 3228 # corrupt the pipe for any other processes which might later 3229 # read from it. 3230 r.poll(5) 3231 3232 def test_boundaries(self): 3233 r, w = self.Pipe(False) 3234 p = self.Process(target=self._child_boundaries, args=(r,)) 3235 p.start() 3236 time.sleep(2) 3237 L = [b"first", b"second"] 3238 for obj in L: 3239 w.send_bytes(obj) 3240 w.close() 3241 p.join() 3242 self.assertIn(r.recv_bytes(), L) 3243 3244 @classmethod 3245 def _child_dont_merge(cls, b): 3246 b.send_bytes(b'a') 3247 b.send_bytes(b'b') 3248 b.send_bytes(b'cd') 3249 3250 def test_dont_merge(self): 3251 a, b = self.Pipe() 3252 self.assertEqual(a.poll(0.0), False) 3253 self.assertEqual(a.poll(0.1), False) 3254 3255 p = self.Process(target=self._child_dont_merge, args=(b,)) 3256 p.start() 3257 3258 self.assertEqual(a.recv_bytes(), b'a') 3259 self.assertEqual(a.poll(1.0), True) 3260 self.assertEqual(a.poll(1.0), True) 3261 self.assertEqual(a.recv_bytes(), b'b') 3262 self.assertEqual(a.poll(1.0), True) 3263 self.assertEqual(a.poll(1.0), True) 3264 self.assertEqual(a.poll(0.0), True) 3265 self.assertEqual(a.recv_bytes(), b'cd') 3266 3267 p.join() 3268 3269# 3270# Test of sending connection and socket objects between processes 3271# 3272 3273@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3274class _TestPicklingConnections(BaseTestCase): 3275 3276 ALLOWED_TYPES = ('processes',) 3277 3278 @classmethod 3279 def tearDownClass(cls): 3280 from multiprocessing import resource_sharer 3281 resource_sharer.stop(timeout=TIMEOUT) 3282 3283 @classmethod 3284 def _listener(cls, conn, families): 3285 for fam in families: 3286 l = cls.connection.Listener(family=fam) 3287 conn.send(l.address) 3288 new_conn = l.accept() 3289 conn.send(new_conn) 3290 new_conn.close() 3291 l.close() 3292 3293 l = socket.socket() 3294 l.bind((test.support.HOST, 0)) 3295 l.listen() 3296 conn.send(l.getsockname()) 3297 new_conn, addr = l.accept() 3298 conn.send(new_conn) 3299 new_conn.close() 3300 l.close() 3301 3302 conn.recv() 3303 3304 @classmethod 3305 def _remote(cls, conn): 3306 for (address, msg) in iter(conn.recv, None): 3307 client = cls.connection.Client(address) 3308 client.send(msg.upper()) 3309 client.close() 3310 3311 address, msg = conn.recv() 3312 client = socket.socket() 3313 client.connect(address) 3314 client.sendall(msg.upper()) 3315 client.close() 3316 3317 conn.close() 3318 3319 def test_pickling(self): 3320 families = self.connection.families 3321 3322 lconn, lconn0 = self.Pipe() 3323 lp = self.Process(target=self._listener, args=(lconn0, families)) 3324 lp.daemon = True 3325 lp.start() 3326 lconn0.close() 3327 3328 rconn, rconn0 = self.Pipe() 3329 rp = self.Process(target=self._remote, args=(rconn0,)) 3330 rp.daemon = True 3331 rp.start() 3332 rconn0.close() 3333 3334 for fam in families: 3335 msg = ('This connection uses family %s' % fam).encode('ascii') 3336 address = lconn.recv() 3337 rconn.send((address, msg)) 3338 new_conn = lconn.recv() 3339 self.assertEqual(new_conn.recv(), msg.upper()) 3340 3341 rconn.send(None) 3342 3343 msg = latin('This connection uses a normal socket') 3344 address = lconn.recv() 3345 rconn.send((address, msg)) 3346 new_conn = lconn.recv() 3347 buf = [] 3348 while True: 3349 s = new_conn.recv(100) 3350 if not s: 3351 break 3352 buf.append(s) 3353 buf = b''.join(buf) 3354 self.assertEqual(buf, msg.upper()) 3355 new_conn.close() 3356 3357 lconn.send(None) 3358 3359 rconn.close() 3360 lconn.close() 3361 3362 lp.join() 3363 rp.join() 3364 3365 @classmethod 3366 def child_access(cls, conn): 3367 w = conn.recv() 3368 w.send('all is well') 3369 w.close() 3370 3371 r = conn.recv() 3372 msg = r.recv() 3373 conn.send(msg*2) 3374 3375 conn.close() 3376 3377 def test_access(self): 3378 # On Windows, if we do not specify a destination pid when 3379 # using DupHandle then we need to be careful to use the 3380 # correct access flags for DuplicateHandle(), or else 3381 # DupHandle.detach() will raise PermissionError. For example, 3382 # for a read only pipe handle we should use 3383 # access=FILE_GENERIC_READ. (Unfortunately 3384 # DUPLICATE_SAME_ACCESS does not work.) 3385 conn, child_conn = self.Pipe() 3386 p = self.Process(target=self.child_access, args=(child_conn,)) 3387 p.daemon = True 3388 p.start() 3389 child_conn.close() 3390 3391 r, w = self.Pipe(duplex=False) 3392 conn.send(w) 3393 w.close() 3394 self.assertEqual(r.recv(), 'all is well') 3395 r.close() 3396 3397 r, w = self.Pipe(duplex=False) 3398 conn.send(r) 3399 r.close() 3400 w.send('foobar') 3401 w.close() 3402 self.assertEqual(conn.recv(), 'foobar'*2) 3403 3404 p.join() 3405 3406# 3407# 3408# 3409 3410class _TestHeap(BaseTestCase): 3411 3412 ALLOWED_TYPES = ('processes',) 3413 3414 def test_heap(self): 3415 iterations = 5000 3416 maxblocks = 50 3417 blocks = [] 3418 3419 # create and destroy lots of blocks of different sizes 3420 for i in range(iterations): 3421 size = int(random.lognormvariate(0, 1) * 1000) 3422 b = multiprocessing.heap.BufferWrapper(size) 3423 blocks.append(b) 3424 if len(blocks) > maxblocks: 3425 i = random.randrange(maxblocks) 3426 del blocks[i] 3427 3428 # get the heap object 3429 heap = multiprocessing.heap.BufferWrapper._heap 3430 3431 # verify the state of the heap 3432 all = [] 3433 occupied = 0 3434 heap._lock.acquire() 3435 self.addCleanup(heap._lock.release) 3436 for L in list(heap._len_to_seq.values()): 3437 for arena, start, stop in L: 3438 all.append((heap._arenas.index(arena), start, stop, 3439 stop-start, 'free')) 3440 for arena, start, stop in heap._allocated_blocks: 3441 all.append((heap._arenas.index(arena), start, stop, 3442 stop-start, 'occupied')) 3443 occupied += (stop-start) 3444 3445 all.sort() 3446 3447 for i in range(len(all)-1): 3448 (arena, start, stop) = all[i][:3] 3449 (narena, nstart, nstop) = all[i+1][:3] 3450 self.assertTrue((arena != narena and nstart == 0) or 3451 (stop == nstart)) 3452 3453 def test_free_from_gc(self): 3454 # Check that freeing of blocks by the garbage collector doesn't deadlock 3455 # (issue #12352). 3456 # Make sure the GC is enabled, and set lower collection thresholds to 3457 # make collections more frequent (and increase the probability of 3458 # deadlock). 3459 if not gc.isenabled(): 3460 gc.enable() 3461 self.addCleanup(gc.disable) 3462 thresholds = gc.get_threshold() 3463 self.addCleanup(gc.set_threshold, *thresholds) 3464 gc.set_threshold(10) 3465 3466 # perform numerous block allocations, with cyclic references to make 3467 # sure objects are collected asynchronously by the gc 3468 for i in range(5000): 3469 a = multiprocessing.heap.BufferWrapper(1) 3470 b = multiprocessing.heap.BufferWrapper(1) 3471 # circular references 3472 a.buddy = b 3473 b.buddy = a 3474 3475# 3476# 3477# 3478 3479class _Foo(Structure): 3480 _fields_ = [ 3481 ('x', c_int), 3482 ('y', c_double), 3483 ('z', c_longlong,) 3484 ] 3485 3486class _TestSharedCTypes(BaseTestCase): 3487 3488 ALLOWED_TYPES = ('processes',) 3489 3490 def setUp(self): 3491 if not HAS_SHAREDCTYPES: 3492 self.skipTest("requires multiprocessing.sharedctypes") 3493 3494 @classmethod 3495 def _double(cls, x, y, z, foo, arr, string): 3496 x.value *= 2 3497 y.value *= 2 3498 z.value *= 2 3499 foo.x *= 2 3500 foo.y *= 2 3501 string.value *= 2 3502 for i in range(len(arr)): 3503 arr[i] *= 2 3504 3505 def test_sharedctypes(self, lock=False): 3506 x = Value('i', 7, lock=lock) 3507 y = Value(c_double, 1.0/3.0, lock=lock) 3508 z = Value(c_longlong, 2 ** 33, lock=lock) 3509 foo = Value(_Foo, 3, 2, lock=lock) 3510 arr = self.Array('d', list(range(10)), lock=lock) 3511 string = self.Array('c', 20, lock=lock) 3512 string.value = latin('hello') 3513 3514 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3515 p.daemon = True 3516 p.start() 3517 p.join() 3518 3519 self.assertEqual(x.value, 14) 3520 self.assertAlmostEqual(y.value, 2.0/3.0) 3521 self.assertEqual(z.value, 2 ** 34) 3522 self.assertEqual(foo.x, 6) 3523 self.assertAlmostEqual(foo.y, 4.0) 3524 for i in range(10): 3525 self.assertAlmostEqual(arr[i], i*2) 3526 self.assertEqual(string.value, latin('hellohello')) 3527 3528 def test_synchronize(self): 3529 self.test_sharedctypes(lock=True) 3530 3531 def test_copy(self): 3532 foo = _Foo(2, 5.0, 2 ** 33) 3533 bar = copy(foo) 3534 foo.x = 0 3535 foo.y = 0 3536 foo.z = 0 3537 self.assertEqual(bar.x, 2) 3538 self.assertAlmostEqual(bar.y, 5.0) 3539 self.assertEqual(bar.z, 2 ** 33) 3540 3541# 3542# 3543# 3544 3545class _TestFinalize(BaseTestCase): 3546 3547 ALLOWED_TYPES = ('processes',) 3548 3549 def setUp(self): 3550 self.registry_backup = util._finalizer_registry.copy() 3551 util._finalizer_registry.clear() 3552 3553 def tearDown(self): 3554 self.assertFalse(util._finalizer_registry) 3555 util._finalizer_registry.update(self.registry_backup) 3556 3557 @classmethod 3558 def _test_finalize(cls, conn): 3559 class Foo(object): 3560 pass 3561 3562 a = Foo() 3563 util.Finalize(a, conn.send, args=('a',)) 3564 del a # triggers callback for a 3565 3566 b = Foo() 3567 close_b = util.Finalize(b, conn.send, args=('b',)) 3568 close_b() # triggers callback for b 3569 close_b() # does nothing because callback has already been called 3570 del b # does nothing because callback has already been called 3571 3572 c = Foo() 3573 util.Finalize(c, conn.send, args=('c',)) 3574 3575 d10 = Foo() 3576 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 3577 3578 d01 = Foo() 3579 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 3580 d02 = Foo() 3581 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 3582 d03 = Foo() 3583 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 3584 3585 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 3586 3587 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 3588 3589 # call multiprocessing's cleanup function then exit process without 3590 # garbage collecting locals 3591 util._exit_function() 3592 conn.close() 3593 os._exit(0) 3594 3595 def test_finalize(self): 3596 conn, child_conn = self.Pipe() 3597 3598 p = self.Process(target=self._test_finalize, args=(child_conn,)) 3599 p.daemon = True 3600 p.start() 3601 p.join() 3602 3603 result = [obj for obj in iter(conn.recv, 'STOP')] 3604 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 3605 3606 def test_thread_safety(self): 3607 # bpo-24484: _run_finalizers() should be thread-safe 3608 def cb(): 3609 pass 3610 3611 class Foo(object): 3612 def __init__(self): 3613 self.ref = self # create reference cycle 3614 # insert finalizer at random key 3615 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 3616 3617 finish = False 3618 exc = None 3619 3620 def run_finalizers(): 3621 nonlocal exc 3622 while not finish: 3623 time.sleep(random.random() * 1e-1) 3624 try: 3625 # A GC run will eventually happen during this, 3626 # collecting stale Foo's and mutating the registry 3627 util._run_finalizers() 3628 except Exception as e: 3629 exc = e 3630 3631 def make_finalizers(): 3632 nonlocal exc 3633 d = {} 3634 while not finish: 3635 try: 3636 # Old Foo's get gradually replaced and later 3637 # collected by the GC (because of the cyclic ref) 3638 d[random.getrandbits(5)] = {Foo() for i in range(10)} 3639 except Exception as e: 3640 exc = e 3641 d.clear() 3642 3643 old_interval = sys.getswitchinterval() 3644 old_threshold = gc.get_threshold() 3645 try: 3646 sys.setswitchinterval(1e-6) 3647 gc.set_threshold(5, 5, 5) 3648 threads = [threading.Thread(target=run_finalizers), 3649 threading.Thread(target=make_finalizers)] 3650 with test.support.start_threads(threads): 3651 time.sleep(4.0) # Wait a bit to trigger race condition 3652 finish = True 3653 if exc is not None: 3654 raise exc 3655 finally: 3656 sys.setswitchinterval(old_interval) 3657 gc.set_threshold(*old_threshold) 3658 gc.collect() # Collect remaining Foo's 3659 3660 3661# 3662# Test that from ... import * works for each module 3663# 3664 3665class _TestImportStar(unittest.TestCase): 3666 3667 def get_module_names(self): 3668 import glob 3669 folder = os.path.dirname(multiprocessing.__file__) 3670 pattern = os.path.join(folder, '*.py') 3671 files = glob.glob(pattern) 3672 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 3673 modules = ['multiprocessing.' + m for m in modules] 3674 modules.remove('multiprocessing.__init__') 3675 modules.append('multiprocessing') 3676 return modules 3677 3678 def test_import(self): 3679 modules = self.get_module_names() 3680 if sys.platform == 'win32': 3681 modules.remove('multiprocessing.popen_fork') 3682 modules.remove('multiprocessing.popen_forkserver') 3683 modules.remove('multiprocessing.popen_spawn_posix') 3684 else: 3685 modules.remove('multiprocessing.popen_spawn_win32') 3686 if not HAS_REDUCTION: 3687 modules.remove('multiprocessing.popen_forkserver') 3688 3689 if c_int is None: 3690 # This module requires _ctypes 3691 modules.remove('multiprocessing.sharedctypes') 3692 3693 for name in modules: 3694 __import__(name) 3695 mod = sys.modules[name] 3696 self.assertTrue(hasattr(mod, '__all__'), name) 3697 3698 for attr in mod.__all__: 3699 self.assertTrue( 3700 hasattr(mod, attr), 3701 '%r does not have attribute %r' % (mod, attr) 3702 ) 3703 3704# 3705# Quick test that logging works -- does not test logging output 3706# 3707 3708class _TestLogging(BaseTestCase): 3709 3710 ALLOWED_TYPES = ('processes',) 3711 3712 def test_enable_logging(self): 3713 logger = multiprocessing.get_logger() 3714 logger.setLevel(util.SUBWARNING) 3715 self.assertTrue(logger is not None) 3716 logger.debug('this will not be printed') 3717 logger.info('nor will this') 3718 logger.setLevel(LOG_LEVEL) 3719 3720 @classmethod 3721 def _test_level(cls, conn): 3722 logger = multiprocessing.get_logger() 3723 conn.send(logger.getEffectiveLevel()) 3724 3725 def test_level(self): 3726 LEVEL1 = 32 3727 LEVEL2 = 37 3728 3729 logger = multiprocessing.get_logger() 3730 root_logger = logging.getLogger() 3731 root_level = root_logger.level 3732 3733 reader, writer = multiprocessing.Pipe(duplex=False) 3734 3735 logger.setLevel(LEVEL1) 3736 p = self.Process(target=self._test_level, args=(writer,)) 3737 p.start() 3738 self.assertEqual(LEVEL1, reader.recv()) 3739 p.join() 3740 p.close() 3741 3742 logger.setLevel(logging.NOTSET) 3743 root_logger.setLevel(LEVEL2) 3744 p = self.Process(target=self._test_level, args=(writer,)) 3745 p.start() 3746 self.assertEqual(LEVEL2, reader.recv()) 3747 p.join() 3748 p.close() 3749 3750 root_logger.setLevel(root_level) 3751 logger.setLevel(level=LOG_LEVEL) 3752 3753 3754# class _TestLoggingProcessName(BaseTestCase): 3755# 3756# def handle(self, record): 3757# assert record.processName == multiprocessing.current_process().name 3758# self.__handled = True 3759# 3760# def test_logging(self): 3761# handler = logging.Handler() 3762# handler.handle = self.handle 3763# self.__handled = False 3764# # Bypass getLogger() and side-effects 3765# logger = logging.getLoggerClass()( 3766# 'multiprocessing.test.TestLoggingProcessName') 3767# logger.addHandler(handler) 3768# logger.propagate = False 3769# 3770# logger.warn('foo') 3771# assert self.__handled 3772 3773# 3774# Check that Process.join() retries if os.waitpid() fails with EINTR 3775# 3776 3777class _TestPollEintr(BaseTestCase): 3778 3779 ALLOWED_TYPES = ('processes',) 3780 3781 @classmethod 3782 def _killer(cls, pid): 3783 time.sleep(0.1) 3784 os.kill(pid, signal.SIGUSR1) 3785 3786 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3787 def test_poll_eintr(self): 3788 got_signal = [False] 3789 def record(*args): 3790 got_signal[0] = True 3791 pid = os.getpid() 3792 oldhandler = signal.signal(signal.SIGUSR1, record) 3793 try: 3794 killer = self.Process(target=self._killer, args=(pid,)) 3795 killer.start() 3796 try: 3797 p = self.Process(target=time.sleep, args=(2,)) 3798 p.start() 3799 p.join() 3800 finally: 3801 killer.join() 3802 self.assertTrue(got_signal[0]) 3803 self.assertEqual(p.exitcode, 0) 3804 finally: 3805 signal.signal(signal.SIGUSR1, oldhandler) 3806 3807# 3808# Test to verify handle verification, see issue 3321 3809# 3810 3811class TestInvalidHandle(unittest.TestCase): 3812 3813 @unittest.skipIf(WIN32, "skipped on Windows") 3814 def test_invalid_handles(self): 3815 conn = multiprocessing.connection.Connection(44977608) 3816 # check that poll() doesn't crash 3817 try: 3818 conn.poll() 3819 except (ValueError, OSError): 3820 pass 3821 finally: 3822 # Hack private attribute _handle to avoid printing an error 3823 # in conn.__del__ 3824 conn._handle = None 3825 self.assertRaises((ValueError, OSError), 3826 multiprocessing.connection.Connection, -1) 3827 3828 3829 3830class OtherTest(unittest.TestCase): 3831 # TODO: add more tests for deliver/answer challenge. 3832 def test_deliver_challenge_auth_failure(self): 3833 class _FakeConnection(object): 3834 def recv_bytes(self, size): 3835 return b'something bogus' 3836 def send_bytes(self, data): 3837 pass 3838 self.assertRaises(multiprocessing.AuthenticationError, 3839 multiprocessing.connection.deliver_challenge, 3840 _FakeConnection(), b'abc') 3841 3842 def test_answer_challenge_auth_failure(self): 3843 class _FakeConnection(object): 3844 def __init__(self): 3845 self.count = 0 3846 def recv_bytes(self, size): 3847 self.count += 1 3848 if self.count == 1: 3849 return multiprocessing.connection.CHALLENGE 3850 elif self.count == 2: 3851 return b'something bogus' 3852 return b'' 3853 def send_bytes(self, data): 3854 pass 3855 self.assertRaises(multiprocessing.AuthenticationError, 3856 multiprocessing.connection.answer_challenge, 3857 _FakeConnection(), b'abc') 3858 3859# 3860# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 3861# 3862 3863def initializer(ns): 3864 ns.test += 1 3865 3866class TestInitializers(unittest.TestCase): 3867 def setUp(self): 3868 self.mgr = multiprocessing.Manager() 3869 self.ns = self.mgr.Namespace() 3870 self.ns.test = 0 3871 3872 def tearDown(self): 3873 self.mgr.shutdown() 3874 self.mgr.join() 3875 3876 def test_manager_initializer(self): 3877 m = multiprocessing.managers.SyncManager() 3878 self.assertRaises(TypeError, m.start, 1) 3879 m.start(initializer, (self.ns,)) 3880 self.assertEqual(self.ns.test, 1) 3881 m.shutdown() 3882 m.join() 3883 3884 def test_pool_initializer(self): 3885 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 3886 p = multiprocessing.Pool(1, initializer, (self.ns,)) 3887 p.close() 3888 p.join() 3889 self.assertEqual(self.ns.test, 1) 3890 3891# 3892# Issue 5155, 5313, 5331: Test process in processes 3893# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 3894# 3895 3896def _this_sub_process(q): 3897 try: 3898 item = q.get(block=False) 3899 except pyqueue.Empty: 3900 pass 3901 3902def _test_process(): 3903 queue = multiprocessing.Queue() 3904 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 3905 subProc.daemon = True 3906 subProc.start() 3907 subProc.join() 3908 3909def _afunc(x): 3910 return x*x 3911 3912def pool_in_process(): 3913 pool = multiprocessing.Pool(processes=4) 3914 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 3915 pool.close() 3916 pool.join() 3917 3918class _file_like(object): 3919 def __init__(self, delegate): 3920 self._delegate = delegate 3921 self._pid = None 3922 3923 @property 3924 def cache(self): 3925 pid = os.getpid() 3926 # There are no race conditions since fork keeps only the running thread 3927 if pid != self._pid: 3928 self._pid = pid 3929 self._cache = [] 3930 return self._cache 3931 3932 def write(self, data): 3933 self.cache.append(data) 3934 3935 def flush(self): 3936 self._delegate.write(''.join(self.cache)) 3937 self._cache = [] 3938 3939class TestStdinBadfiledescriptor(unittest.TestCase): 3940 3941 def test_queue_in_process(self): 3942 proc = multiprocessing.Process(target=_test_process) 3943 proc.start() 3944 proc.join() 3945 3946 def test_pool_in_process(self): 3947 p = multiprocessing.Process(target=pool_in_process) 3948 p.start() 3949 p.join() 3950 3951 def test_flushing(self): 3952 sio = io.StringIO() 3953 flike = _file_like(sio) 3954 flike.write('foo') 3955 proc = multiprocessing.Process(target=lambda: flike.flush()) 3956 flike.flush() 3957 assert sio.getvalue() == 'foo' 3958 3959 3960class TestWait(unittest.TestCase): 3961 3962 @classmethod 3963 def _child_test_wait(cls, w, slow): 3964 for i in range(10): 3965 if slow: 3966 time.sleep(random.random()*0.1) 3967 w.send((i, os.getpid())) 3968 w.close() 3969 3970 def test_wait(self, slow=False): 3971 from multiprocessing.connection import wait 3972 readers = [] 3973 procs = [] 3974 messages = [] 3975 3976 for i in range(4): 3977 r, w = multiprocessing.Pipe(duplex=False) 3978 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 3979 p.daemon = True 3980 p.start() 3981 w.close() 3982 readers.append(r) 3983 procs.append(p) 3984 self.addCleanup(p.join) 3985 3986 while readers: 3987 for r in wait(readers): 3988 try: 3989 msg = r.recv() 3990 except EOFError: 3991 readers.remove(r) 3992 r.close() 3993 else: 3994 messages.append(msg) 3995 3996 messages.sort() 3997 expected = sorted((i, p.pid) for i in range(10) for p in procs) 3998 self.assertEqual(messages, expected) 3999 4000 @classmethod 4001 def _child_test_wait_socket(cls, address, slow): 4002 s = socket.socket() 4003 s.connect(address) 4004 for i in range(10): 4005 if slow: 4006 time.sleep(random.random()*0.1) 4007 s.sendall(('%s\n' % i).encode('ascii')) 4008 s.close() 4009 4010 def test_wait_socket(self, slow=False): 4011 from multiprocessing.connection import wait 4012 l = socket.socket() 4013 l.bind((test.support.HOST, 0)) 4014 l.listen() 4015 addr = l.getsockname() 4016 readers = [] 4017 procs = [] 4018 dic = {} 4019 4020 for i in range(4): 4021 p = multiprocessing.Process(target=self._child_test_wait_socket, 4022 args=(addr, slow)) 4023 p.daemon = True 4024 p.start() 4025 procs.append(p) 4026 self.addCleanup(p.join) 4027 4028 for i in range(4): 4029 r, _ = l.accept() 4030 readers.append(r) 4031 dic[r] = [] 4032 l.close() 4033 4034 while readers: 4035 for r in wait(readers): 4036 msg = r.recv(32) 4037 if not msg: 4038 readers.remove(r) 4039 r.close() 4040 else: 4041 dic[r].append(msg) 4042 4043 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4044 for v in dic.values(): 4045 self.assertEqual(b''.join(v), expected) 4046 4047 def test_wait_slow(self): 4048 self.test_wait(True) 4049 4050 def test_wait_socket_slow(self): 4051 self.test_wait_socket(True) 4052 4053 def test_wait_timeout(self): 4054 from multiprocessing.connection import wait 4055 4056 expected = 5 4057 a, b = multiprocessing.Pipe() 4058 4059 start = time.monotonic() 4060 res = wait([a, b], expected) 4061 delta = time.monotonic() - start 4062 4063 self.assertEqual(res, []) 4064 self.assertLess(delta, expected * 2) 4065 self.assertGreater(delta, expected * 0.5) 4066 4067 b.send(None) 4068 4069 start = time.monotonic() 4070 res = wait([a, b], 20) 4071 delta = time.monotonic() - start 4072 4073 self.assertEqual(res, [a]) 4074 self.assertLess(delta, 0.4) 4075 4076 @classmethod 4077 def signal_and_sleep(cls, sem, period): 4078 sem.release() 4079 time.sleep(period) 4080 4081 def test_wait_integer(self): 4082 from multiprocessing.connection import wait 4083 4084 expected = 3 4085 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4086 sem = multiprocessing.Semaphore(0) 4087 a, b = multiprocessing.Pipe() 4088 p = multiprocessing.Process(target=self.signal_and_sleep, 4089 args=(sem, expected)) 4090 4091 p.start() 4092 self.assertIsInstance(p.sentinel, int) 4093 self.assertTrue(sem.acquire(timeout=20)) 4094 4095 start = time.monotonic() 4096 res = wait([a, p.sentinel, b], expected + 20) 4097 delta = time.monotonic() - start 4098 4099 self.assertEqual(res, [p.sentinel]) 4100 self.assertLess(delta, expected + 2) 4101 self.assertGreater(delta, expected - 2) 4102 4103 a.send(None) 4104 4105 start = time.monotonic() 4106 res = wait([a, p.sentinel, b], 20) 4107 delta = time.monotonic() - start 4108 4109 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4110 self.assertLess(delta, 0.4) 4111 4112 b.send(None) 4113 4114 start = time.monotonic() 4115 res = wait([a, p.sentinel, b], 20) 4116 delta = time.monotonic() - start 4117 4118 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4119 self.assertLess(delta, 0.4) 4120 4121 p.terminate() 4122 p.join() 4123 4124 def test_neg_timeout(self): 4125 from multiprocessing.connection import wait 4126 a, b = multiprocessing.Pipe() 4127 t = time.monotonic() 4128 res = wait([a], timeout=-1) 4129 t = time.monotonic() - t 4130 self.assertEqual(res, []) 4131 self.assertLess(t, 1) 4132 a.close() 4133 b.close() 4134 4135# 4136# Issue 14151: Test invalid family on invalid environment 4137# 4138 4139class TestInvalidFamily(unittest.TestCase): 4140 4141 @unittest.skipIf(WIN32, "skipped on Windows") 4142 def test_invalid_family(self): 4143 with self.assertRaises(ValueError): 4144 multiprocessing.connection.Listener(r'\\.\test') 4145 4146 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4147 def test_invalid_family_win32(self): 4148 with self.assertRaises(ValueError): 4149 multiprocessing.connection.Listener('/var/test.pipe') 4150 4151# 4152# Issue 12098: check sys.flags of child matches that for parent 4153# 4154 4155class TestFlags(unittest.TestCase): 4156 @classmethod 4157 def run_in_grandchild(cls, conn): 4158 conn.send(tuple(sys.flags)) 4159 4160 @classmethod 4161 def run_in_child(cls): 4162 import json 4163 r, w = multiprocessing.Pipe(duplex=False) 4164 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4165 p.start() 4166 grandchild_flags = r.recv() 4167 p.join() 4168 r.close() 4169 w.close() 4170 flags = (tuple(sys.flags), grandchild_flags) 4171 print(json.dumps(flags)) 4172 4173 def test_flags(self): 4174 import json, subprocess 4175 # start child process using unusual flags 4176 prog = ('from test._test_multiprocessing import TestFlags; ' + 4177 'TestFlags.run_in_child()') 4178 data = subprocess.check_output( 4179 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4180 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4181 self.assertEqual(child_flags, grandchild_flags) 4182 4183# 4184# Test interaction with socket timeouts - see Issue #6056 4185# 4186 4187class TestTimeouts(unittest.TestCase): 4188 @classmethod 4189 def _test_timeout(cls, child, address): 4190 time.sleep(1) 4191 child.send(123) 4192 child.close() 4193 conn = multiprocessing.connection.Client(address) 4194 conn.send(456) 4195 conn.close() 4196 4197 def test_timeout(self): 4198 old_timeout = socket.getdefaulttimeout() 4199 try: 4200 socket.setdefaulttimeout(0.1) 4201 parent, child = multiprocessing.Pipe(duplex=True) 4202 l = multiprocessing.connection.Listener(family='AF_INET') 4203 p = multiprocessing.Process(target=self._test_timeout, 4204 args=(child, l.address)) 4205 p.start() 4206 child.close() 4207 self.assertEqual(parent.recv(), 123) 4208 parent.close() 4209 conn = l.accept() 4210 self.assertEqual(conn.recv(), 456) 4211 conn.close() 4212 l.close() 4213 join_process(p) 4214 finally: 4215 socket.setdefaulttimeout(old_timeout) 4216 4217# 4218# Test what happens with no "if __name__ == '__main__'" 4219# 4220 4221class TestNoForkBomb(unittest.TestCase): 4222 def test_noforkbomb(self): 4223 sm = multiprocessing.get_start_method() 4224 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 4225 if sm != 'fork': 4226 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 4227 self.assertEqual(out, b'') 4228 self.assertIn(b'RuntimeError', err) 4229 else: 4230 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 4231 self.assertEqual(out.rstrip(), b'123') 4232 self.assertEqual(err, b'') 4233 4234# 4235# Issue #17555: ForkAwareThreadLock 4236# 4237 4238class TestForkAwareThreadLock(unittest.TestCase): 4239 # We recursively start processes. Issue #17555 meant that the 4240 # after fork registry would get duplicate entries for the same 4241 # lock. The size of the registry at generation n was ~2**n. 4242 4243 @classmethod 4244 def child(cls, n, conn): 4245 if n > 1: 4246 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 4247 p.start() 4248 conn.close() 4249 join_process(p) 4250 else: 4251 conn.send(len(util._afterfork_registry)) 4252 conn.close() 4253 4254 def test_lock(self): 4255 r, w = multiprocessing.Pipe(False) 4256 l = util.ForkAwareThreadLock() 4257 old_size = len(util._afterfork_registry) 4258 p = multiprocessing.Process(target=self.child, args=(5, w)) 4259 p.start() 4260 w.close() 4261 new_size = r.recv() 4262 join_process(p) 4263 self.assertLessEqual(new_size, old_size) 4264 4265# 4266# Check that non-forked child processes do not inherit unneeded fds/handles 4267# 4268 4269class TestCloseFds(unittest.TestCase): 4270 4271 def get_high_socket_fd(self): 4272 if WIN32: 4273 # The child process will not have any socket handles, so 4274 # calling socket.fromfd() should produce WSAENOTSOCK even 4275 # if there is a handle of the same number. 4276 return socket.socket().detach() 4277 else: 4278 # We want to produce a socket with an fd high enough that a 4279 # freshly created child process will not have any fds as high. 4280 fd = socket.socket().detach() 4281 to_close = [] 4282 while fd < 50: 4283 to_close.append(fd) 4284 fd = os.dup(fd) 4285 for x in to_close: 4286 os.close(x) 4287 return fd 4288 4289 def close(self, fd): 4290 if WIN32: 4291 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 4292 else: 4293 os.close(fd) 4294 4295 @classmethod 4296 def _test_closefds(cls, conn, fd): 4297 try: 4298 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 4299 except Exception as e: 4300 conn.send(e) 4301 else: 4302 s.close() 4303 conn.send(None) 4304 4305 def test_closefd(self): 4306 if not HAS_REDUCTION: 4307 raise unittest.SkipTest('requires fd pickling') 4308 4309 reader, writer = multiprocessing.Pipe() 4310 fd = self.get_high_socket_fd() 4311 try: 4312 p = multiprocessing.Process(target=self._test_closefds, 4313 args=(writer, fd)) 4314 p.start() 4315 writer.close() 4316 e = reader.recv() 4317 join_process(p) 4318 finally: 4319 self.close(fd) 4320 writer.close() 4321 reader.close() 4322 4323 if multiprocessing.get_start_method() == 'fork': 4324 self.assertIs(e, None) 4325 else: 4326 WSAENOTSOCK = 10038 4327 self.assertIsInstance(e, OSError) 4328 self.assertTrue(e.errno == errno.EBADF or 4329 e.winerror == WSAENOTSOCK, e) 4330 4331# 4332# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 4333# 4334 4335class TestIgnoreEINTR(unittest.TestCase): 4336 4337 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 4338 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 4339 4340 @classmethod 4341 def _test_ignore(cls, conn): 4342 def handler(signum, frame): 4343 pass 4344 signal.signal(signal.SIGUSR1, handler) 4345 conn.send('ready') 4346 x = conn.recv() 4347 conn.send(x) 4348 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 4349 4350 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4351 def test_ignore(self): 4352 conn, child_conn = multiprocessing.Pipe() 4353 try: 4354 p = multiprocessing.Process(target=self._test_ignore, 4355 args=(child_conn,)) 4356 p.daemon = True 4357 p.start() 4358 child_conn.close() 4359 self.assertEqual(conn.recv(), 'ready') 4360 time.sleep(0.1) 4361 os.kill(p.pid, signal.SIGUSR1) 4362 time.sleep(0.1) 4363 conn.send(1234) 4364 self.assertEqual(conn.recv(), 1234) 4365 time.sleep(0.1) 4366 os.kill(p.pid, signal.SIGUSR1) 4367 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 4368 time.sleep(0.1) 4369 p.join() 4370 finally: 4371 conn.close() 4372 4373 @classmethod 4374 def _test_ignore_listener(cls, conn): 4375 def handler(signum, frame): 4376 pass 4377 signal.signal(signal.SIGUSR1, handler) 4378 with multiprocessing.connection.Listener() as l: 4379 conn.send(l.address) 4380 a = l.accept() 4381 a.send('welcome') 4382 4383 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4384 def test_ignore_listener(self): 4385 conn, child_conn = multiprocessing.Pipe() 4386 try: 4387 p = multiprocessing.Process(target=self._test_ignore_listener, 4388 args=(child_conn,)) 4389 p.daemon = True 4390 p.start() 4391 child_conn.close() 4392 address = conn.recv() 4393 time.sleep(0.1) 4394 os.kill(p.pid, signal.SIGUSR1) 4395 time.sleep(0.1) 4396 client = multiprocessing.connection.Client(address) 4397 self.assertEqual(client.recv(), 'welcome') 4398 p.join() 4399 finally: 4400 conn.close() 4401 4402class TestStartMethod(unittest.TestCase): 4403 @classmethod 4404 def _check_context(cls, conn): 4405 conn.send(multiprocessing.get_start_method()) 4406 4407 def check_context(self, ctx): 4408 r, w = ctx.Pipe(duplex=False) 4409 p = ctx.Process(target=self._check_context, args=(w,)) 4410 p.start() 4411 w.close() 4412 child_method = r.recv() 4413 r.close() 4414 p.join() 4415 self.assertEqual(child_method, ctx.get_start_method()) 4416 4417 def test_context(self): 4418 for method in ('fork', 'spawn', 'forkserver'): 4419 try: 4420 ctx = multiprocessing.get_context(method) 4421 except ValueError: 4422 continue 4423 self.assertEqual(ctx.get_start_method(), method) 4424 self.assertIs(ctx.get_context(), ctx) 4425 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 4426 self.assertRaises(ValueError, ctx.set_start_method, None) 4427 self.check_context(ctx) 4428 4429 def test_set_get(self): 4430 multiprocessing.set_forkserver_preload(PRELOAD) 4431 count = 0 4432 old_method = multiprocessing.get_start_method() 4433 try: 4434 for method in ('fork', 'spawn', 'forkserver'): 4435 try: 4436 multiprocessing.set_start_method(method, force=True) 4437 except ValueError: 4438 continue 4439 self.assertEqual(multiprocessing.get_start_method(), method) 4440 ctx = multiprocessing.get_context() 4441 self.assertEqual(ctx.get_start_method(), method) 4442 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 4443 self.assertTrue( 4444 ctx.Process.__name__.lower().startswith(method)) 4445 self.check_context(multiprocessing) 4446 count += 1 4447 finally: 4448 multiprocessing.set_start_method(old_method, force=True) 4449 self.assertGreaterEqual(count, 1) 4450 4451 def test_get_all(self): 4452 methods = multiprocessing.get_all_start_methods() 4453 if sys.platform == 'win32': 4454 self.assertEqual(methods, ['spawn']) 4455 else: 4456 self.assertTrue(methods == ['fork', 'spawn'] or 4457 methods == ['fork', 'spawn', 'forkserver']) 4458 4459 def test_preload_resources(self): 4460 if multiprocessing.get_start_method() != 'forkserver': 4461 self.skipTest("test only relevant for 'forkserver' method") 4462 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 4463 rc, out, err = test.support.script_helper.assert_python_ok(name) 4464 out = out.decode() 4465 err = err.decode() 4466 if out.rstrip() != 'ok' or err != '': 4467 print(out) 4468 print(err) 4469 self.fail("failed spawning forkserver or grandchild") 4470 4471 4472@unittest.skipIf(sys.platform == "win32", 4473 "test semantics don't make sense on Windows") 4474class TestSemaphoreTracker(unittest.TestCase): 4475 4476 def test_semaphore_tracker(self): 4477 # 4478 # Check that killing process does not leak named semaphores 4479 # 4480 import subprocess 4481 cmd = '''if 1: 4482 import multiprocessing as mp, time, os 4483 mp.set_start_method("spawn") 4484 lock1 = mp.Lock() 4485 lock2 = mp.Lock() 4486 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") 4487 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") 4488 time.sleep(10) 4489 ''' 4490 r, w = os.pipe() 4491 p = subprocess.Popen([sys.executable, 4492 '-E', '-c', cmd % (w, w)], 4493 pass_fds=[w], 4494 stderr=subprocess.PIPE) 4495 os.close(w) 4496 with open(r, 'rb', closefd=True) as f: 4497 name1 = f.readline().rstrip().decode('ascii') 4498 name2 = f.readline().rstrip().decode('ascii') 4499 _multiprocessing.sem_unlink(name1) 4500 p.terminate() 4501 p.wait() 4502 time.sleep(2.0) 4503 with self.assertRaises(OSError) as ctx: 4504 _multiprocessing.sem_unlink(name2) 4505 # docs say it should be ENOENT, but OSX seems to give EINVAL 4506 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) 4507 err = p.stderr.read().decode('utf-8') 4508 p.stderr.close() 4509 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores' 4510 self.assertRegex(err, expected) 4511 self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) 4512 4513 def check_semaphore_tracker_death(self, signum, should_die): 4514 # bpo-31310: if the semaphore tracker process has died, it should 4515 # be restarted implicitly. 4516 from multiprocessing.semaphore_tracker import _semaphore_tracker 4517 _semaphore_tracker.ensure_running() 4518 pid = _semaphore_tracker._pid 4519 os.kill(pid, signum) 4520 time.sleep(1.0) # give it time to die 4521 4522 ctx = multiprocessing.get_context("spawn") 4523 with contextlib.ExitStack() as stack: 4524 if should_die: 4525 stack.enter_context(self.assertWarnsRegex( 4526 UserWarning, 4527 "semaphore_tracker: process died")) 4528 sem = ctx.Semaphore() 4529 sem.acquire() 4530 sem.release() 4531 wr = weakref.ref(sem) 4532 # ensure `sem` gets collected, which triggers communication with 4533 # the semaphore tracker 4534 del sem 4535 gc.collect() 4536 self.assertIsNone(wr()) 4537 4538 def test_semaphore_tracker_sigint(self): 4539 # Catchable signal (ignored by semaphore tracker) 4540 self.check_semaphore_tracker_death(signal.SIGINT, False) 4541 4542 def test_semaphore_tracker_sigkill(self): 4543 # Uncatchable signal. 4544 self.check_semaphore_tracker_death(signal.SIGKILL, True) 4545 4546 4547class TestSimpleQueue(unittest.TestCase): 4548 4549 @classmethod 4550 def _test_empty(cls, queue, child_can_start, parent_can_continue): 4551 child_can_start.wait() 4552 # issue 30301, could fail under spawn and forkserver 4553 try: 4554 queue.put(queue.empty()) 4555 queue.put(queue.empty()) 4556 finally: 4557 parent_can_continue.set() 4558 4559 def test_empty(self): 4560 queue = multiprocessing.SimpleQueue() 4561 child_can_start = multiprocessing.Event() 4562 parent_can_continue = multiprocessing.Event() 4563 4564 proc = multiprocessing.Process( 4565 target=self._test_empty, 4566 args=(queue, child_can_start, parent_can_continue) 4567 ) 4568 proc.daemon = True 4569 proc.start() 4570 4571 self.assertTrue(queue.empty()) 4572 4573 child_can_start.set() 4574 parent_can_continue.wait() 4575 4576 self.assertFalse(queue.empty()) 4577 self.assertEqual(queue.get(), True) 4578 self.assertEqual(queue.get(), False) 4579 self.assertTrue(queue.empty()) 4580 4581 proc.join() 4582 4583 4584class TestSyncManagerTypes(unittest.TestCase): 4585 """Test all the types which can be shared between a parent and a 4586 child process by using a manager which acts as an intermediary 4587 between them. 4588 4589 In the following unit-tests the base type is created in the parent 4590 process, the @classmethod represents the worker process and the 4591 shared object is readable and editable between the two. 4592 4593 # The child. 4594 @classmethod 4595 def _test_list(cls, obj): 4596 assert obj[0] == 5 4597 assert obj.append(6) 4598 4599 # The parent. 4600 def test_list(self): 4601 o = self.manager.list() 4602 o.append(5) 4603 self.run_worker(self._test_list, o) 4604 assert o[1] == 6 4605 """ 4606 manager_class = multiprocessing.managers.SyncManager 4607 4608 def setUp(self): 4609 self.manager = self.manager_class() 4610 self.manager.start() 4611 self.proc = None 4612 4613 def tearDown(self): 4614 if self.proc is not None and self.proc.is_alive(): 4615 self.proc.terminate() 4616 self.proc.join() 4617 self.manager.shutdown() 4618 self.manager = None 4619 self.proc = None 4620 4621 @classmethod 4622 def setUpClass(cls): 4623 support.reap_children() 4624 4625 tearDownClass = setUpClass 4626 4627 def wait_proc_exit(self): 4628 # Only the manager process should be returned by active_children() 4629 # but this can take a bit on slow machines, so wait a few seconds 4630 # if there are other children too (see #17395). 4631 join_process(self.proc) 4632 start_time = time.monotonic() 4633 t = 0.01 4634 while len(multiprocessing.active_children()) > 1: 4635 time.sleep(t) 4636 t *= 2 4637 dt = time.monotonic() - start_time 4638 if dt >= 5.0: 4639 test.support.environment_altered = True 4640 print("Warning -- multiprocessing.Manager still has %s active " 4641 "children after %s seconds" 4642 % (multiprocessing.active_children(), dt), 4643 file=sys.stderr) 4644 break 4645 4646 def run_worker(self, worker, obj): 4647 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 4648 self.proc.daemon = True 4649 self.proc.start() 4650 self.wait_proc_exit() 4651 self.assertEqual(self.proc.exitcode, 0) 4652 4653 @classmethod 4654 def _test_queue(cls, obj): 4655 assert obj.qsize() == 2 4656 assert obj.full() 4657 assert not obj.empty() 4658 assert obj.get() == 5 4659 assert not obj.empty() 4660 assert obj.get() == 6 4661 assert obj.empty() 4662 4663 def test_queue(self, qname="Queue"): 4664 o = getattr(self.manager, qname)(2) 4665 o.put(5) 4666 o.put(6) 4667 self.run_worker(self._test_queue, o) 4668 assert o.empty() 4669 assert not o.full() 4670 4671 def test_joinable_queue(self): 4672 self.test_queue("JoinableQueue") 4673 4674 @classmethod 4675 def _test_event(cls, obj): 4676 assert obj.is_set() 4677 obj.wait() 4678 obj.clear() 4679 obj.wait(0.001) 4680 4681 def test_event(self): 4682 o = self.manager.Event() 4683 o.set() 4684 self.run_worker(self._test_event, o) 4685 assert not o.is_set() 4686 o.wait(0.001) 4687 4688 @classmethod 4689 def _test_lock(cls, obj): 4690 obj.acquire() 4691 4692 def test_lock(self, lname="Lock"): 4693 o = getattr(self.manager, lname)() 4694 self.run_worker(self._test_lock, o) 4695 o.release() 4696 self.assertRaises(RuntimeError, o.release) # already released 4697 4698 @classmethod 4699 def _test_rlock(cls, obj): 4700 obj.acquire() 4701 obj.release() 4702 4703 def test_rlock(self, lname="Lock"): 4704 o = getattr(self.manager, lname)() 4705 self.run_worker(self._test_rlock, o) 4706 4707 @classmethod 4708 def _test_semaphore(cls, obj): 4709 obj.acquire() 4710 4711 def test_semaphore(self, sname="Semaphore"): 4712 o = getattr(self.manager, sname)() 4713 self.run_worker(self._test_semaphore, o) 4714 o.release() 4715 4716 def test_bounded_semaphore(self): 4717 self.test_semaphore(sname="BoundedSemaphore") 4718 4719 @classmethod 4720 def _test_condition(cls, obj): 4721 obj.acquire() 4722 obj.release() 4723 4724 def test_condition(self): 4725 o = self.manager.Condition() 4726 self.run_worker(self._test_condition, o) 4727 4728 @classmethod 4729 def _test_barrier(cls, obj): 4730 assert obj.parties == 5 4731 obj.reset() 4732 4733 def test_barrier(self): 4734 o = self.manager.Barrier(5) 4735 self.run_worker(self._test_barrier, o) 4736 4737 @classmethod 4738 def _test_pool(cls, obj): 4739 # TODO: fix https://bugs.python.org/issue35919 4740 with obj: 4741 pass 4742 4743 def test_pool(self): 4744 o = self.manager.Pool(processes=4) 4745 self.run_worker(self._test_pool, o) 4746 4747 @classmethod 4748 def _test_list(cls, obj): 4749 assert obj[0] == 5 4750 assert obj.count(5) == 1 4751 assert obj.index(5) == 0 4752 obj.sort() 4753 obj.reverse() 4754 for x in obj: 4755 pass 4756 assert len(obj) == 1 4757 assert obj.pop(0) == 5 4758 4759 def test_list(self): 4760 o = self.manager.list() 4761 o.append(5) 4762 self.run_worker(self._test_list, o) 4763 assert not o 4764 self.assertEqual(len(o), 0) 4765 4766 @classmethod 4767 def _test_dict(cls, obj): 4768 assert len(obj) == 1 4769 assert obj['foo'] == 5 4770 assert obj.get('foo') == 5 4771 assert list(obj.items()) == [('foo', 5)] 4772 assert list(obj.keys()) == ['foo'] 4773 assert list(obj.values()) == [5] 4774 assert obj.copy() == {'foo': 5} 4775 assert obj.popitem() == ('foo', 5) 4776 4777 def test_dict(self): 4778 o = self.manager.dict() 4779 o['foo'] = 5 4780 self.run_worker(self._test_dict, o) 4781 assert not o 4782 self.assertEqual(len(o), 0) 4783 4784 @classmethod 4785 def _test_value(cls, obj): 4786 assert obj.value == 1 4787 assert obj.get() == 1 4788 obj.set(2) 4789 4790 def test_value(self): 4791 o = self.manager.Value('i', 1) 4792 self.run_worker(self._test_value, o) 4793 self.assertEqual(o.value, 2) 4794 self.assertEqual(o.get(), 2) 4795 4796 @classmethod 4797 def _test_array(cls, obj): 4798 assert obj[0] == 0 4799 assert obj[1] == 1 4800 assert len(obj) == 2 4801 assert list(obj) == [0, 1] 4802 4803 def test_array(self): 4804 o = self.manager.Array('i', [0, 1]) 4805 self.run_worker(self._test_array, o) 4806 4807 @classmethod 4808 def _test_namespace(cls, obj): 4809 assert obj.x == 0 4810 assert obj.y == 1 4811 4812 def test_namespace(self): 4813 o = self.manager.Namespace() 4814 o.x = 0 4815 o.y = 1 4816 self.run_worker(self._test_namespace, o) 4817 4818 4819# 4820# Mixins 4821# 4822 4823class BaseMixin(object): 4824 @classmethod 4825 def setUpClass(cls): 4826 cls.dangling = (multiprocessing.process._dangling.copy(), 4827 threading._dangling.copy()) 4828 4829 @classmethod 4830 def tearDownClass(cls): 4831 # bpo-26762: Some multiprocessing objects like Pool create reference 4832 # cycles. Trigger a garbage collection to break these cycles. 4833 test.support.gc_collect() 4834 4835 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 4836 if processes: 4837 test.support.environment_altered = True 4838 print('Warning -- Dangling processes: %s' % processes, 4839 file=sys.stderr) 4840 processes = None 4841 4842 threads = set(threading._dangling) - set(cls.dangling[1]) 4843 if threads: 4844 test.support.environment_altered = True 4845 print('Warning -- Dangling threads: %s' % threads, 4846 file=sys.stderr) 4847 threads = None 4848 4849 4850class ProcessesMixin(BaseMixin): 4851 TYPE = 'processes' 4852 Process = multiprocessing.Process 4853 connection = multiprocessing.connection 4854 current_process = staticmethod(multiprocessing.current_process) 4855 active_children = staticmethod(multiprocessing.active_children) 4856 Pool = staticmethod(multiprocessing.Pool) 4857 Pipe = staticmethod(multiprocessing.Pipe) 4858 Queue = staticmethod(multiprocessing.Queue) 4859 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 4860 Lock = staticmethod(multiprocessing.Lock) 4861 RLock = staticmethod(multiprocessing.RLock) 4862 Semaphore = staticmethod(multiprocessing.Semaphore) 4863 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 4864 Condition = staticmethod(multiprocessing.Condition) 4865 Event = staticmethod(multiprocessing.Event) 4866 Barrier = staticmethod(multiprocessing.Barrier) 4867 Value = staticmethod(multiprocessing.Value) 4868 Array = staticmethod(multiprocessing.Array) 4869 RawValue = staticmethod(multiprocessing.RawValue) 4870 RawArray = staticmethod(multiprocessing.RawArray) 4871 4872 4873class ManagerMixin(BaseMixin): 4874 TYPE = 'manager' 4875 Process = multiprocessing.Process 4876 Queue = property(operator.attrgetter('manager.Queue')) 4877 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 4878 Lock = property(operator.attrgetter('manager.Lock')) 4879 RLock = property(operator.attrgetter('manager.RLock')) 4880 Semaphore = property(operator.attrgetter('manager.Semaphore')) 4881 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 4882 Condition = property(operator.attrgetter('manager.Condition')) 4883 Event = property(operator.attrgetter('manager.Event')) 4884 Barrier = property(operator.attrgetter('manager.Barrier')) 4885 Value = property(operator.attrgetter('manager.Value')) 4886 Array = property(operator.attrgetter('manager.Array')) 4887 list = property(operator.attrgetter('manager.list')) 4888 dict = property(operator.attrgetter('manager.dict')) 4889 Namespace = property(operator.attrgetter('manager.Namespace')) 4890 4891 @classmethod 4892 def Pool(cls, *args, **kwds): 4893 return cls.manager.Pool(*args, **kwds) 4894 4895 @classmethod 4896 def setUpClass(cls): 4897 super().setUpClass() 4898 cls.manager = multiprocessing.Manager() 4899 4900 @classmethod 4901 def tearDownClass(cls): 4902 # only the manager process should be returned by active_children() 4903 # but this can take a bit on slow machines, so wait a few seconds 4904 # if there are other children too (see #17395) 4905 start_time = time.monotonic() 4906 t = 0.01 4907 while len(multiprocessing.active_children()) > 1: 4908 time.sleep(t) 4909 t *= 2 4910 dt = time.monotonic() - start_time 4911 if dt >= 5.0: 4912 test.support.environment_altered = True 4913 print("Warning -- multiprocessing.Manager still has %s active " 4914 "children after %s seconds" 4915 % (multiprocessing.active_children(), dt), 4916 file=sys.stderr) 4917 break 4918 4919 gc.collect() # do garbage collection 4920 if cls.manager._number_of_objects() != 0: 4921 # This is not really an error since some tests do not 4922 # ensure that all processes which hold a reference to a 4923 # managed object have been joined. 4924 test.support.environment_altered = True 4925 print('Warning -- Shared objects which still exist at manager ' 4926 'shutdown:') 4927 print(cls.manager._debug_info()) 4928 cls.manager.shutdown() 4929 cls.manager.join() 4930 cls.manager = None 4931 4932 super().tearDownClass() 4933 4934 4935class ThreadsMixin(BaseMixin): 4936 TYPE = 'threads' 4937 Process = multiprocessing.dummy.Process 4938 connection = multiprocessing.dummy.connection 4939 current_process = staticmethod(multiprocessing.dummy.current_process) 4940 active_children = staticmethod(multiprocessing.dummy.active_children) 4941 Pool = staticmethod(multiprocessing.dummy.Pool) 4942 Pipe = staticmethod(multiprocessing.dummy.Pipe) 4943 Queue = staticmethod(multiprocessing.dummy.Queue) 4944 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 4945 Lock = staticmethod(multiprocessing.dummy.Lock) 4946 RLock = staticmethod(multiprocessing.dummy.RLock) 4947 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 4948 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 4949 Condition = staticmethod(multiprocessing.dummy.Condition) 4950 Event = staticmethod(multiprocessing.dummy.Event) 4951 Barrier = staticmethod(multiprocessing.dummy.Barrier) 4952 Value = staticmethod(multiprocessing.dummy.Value) 4953 Array = staticmethod(multiprocessing.dummy.Array) 4954 4955# 4956# Functions used to create test cases from the base ones in this module 4957# 4958 4959def install_tests_in_module_dict(remote_globs, start_method): 4960 __module__ = remote_globs['__name__'] 4961 local_globs = globals() 4962 ALL_TYPES = {'processes', 'threads', 'manager'} 4963 4964 for name, base in local_globs.items(): 4965 if not isinstance(base, type): 4966 continue 4967 if issubclass(base, BaseTestCase): 4968 if base is BaseTestCase: 4969 continue 4970 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 4971 for type_ in base.ALLOWED_TYPES: 4972 newname = 'With' + type_.capitalize() + name[1:] 4973 Mixin = local_globs[type_.capitalize() + 'Mixin'] 4974 class Temp(base, Mixin, unittest.TestCase): 4975 pass 4976 Temp.__name__ = Temp.__qualname__ = newname 4977 Temp.__module__ = __module__ 4978 remote_globs[newname] = Temp 4979 elif issubclass(base, unittest.TestCase): 4980 class Temp(base, object): 4981 pass 4982 Temp.__name__ = Temp.__qualname__ = name 4983 Temp.__module__ = __module__ 4984 remote_globs[name] = Temp 4985 4986 dangling = [None, None] 4987 old_start_method = [None] 4988 4989 def setUpModule(): 4990 multiprocessing.set_forkserver_preload(PRELOAD) 4991 multiprocessing.process._cleanup() 4992 dangling[0] = multiprocessing.process._dangling.copy() 4993 dangling[1] = threading._dangling.copy() 4994 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 4995 try: 4996 multiprocessing.set_start_method(start_method, force=True) 4997 except ValueError: 4998 raise unittest.SkipTest(start_method + 4999 ' start method not supported') 5000 5001 if sys.platform.startswith("linux"): 5002 try: 5003 lock = multiprocessing.RLock() 5004 except OSError: 5005 raise unittest.SkipTest("OSError raises on RLock creation, " 5006 "see issue 3111!") 5007 check_enough_semaphores() 5008 util.get_temp_dir() # creates temp directory 5009 multiprocessing.get_logger().setLevel(LOG_LEVEL) 5010 5011 def tearDownModule(): 5012 need_sleep = False 5013 5014 # bpo-26762: Some multiprocessing objects like Pool create reference 5015 # cycles. Trigger a garbage collection to break these cycles. 5016 test.support.gc_collect() 5017 5018 multiprocessing.set_start_method(old_start_method[0], force=True) 5019 # pause a bit so we don't get warning about dangling threads/processes 5020 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 5021 if processes: 5022 need_sleep = True 5023 test.support.environment_altered = True 5024 print('Warning -- Dangling processes: %s' % processes, 5025 file=sys.stderr) 5026 processes = None 5027 5028 threads = set(threading._dangling) - set(dangling[1]) 5029 if threads: 5030 need_sleep = True 5031 test.support.environment_altered = True 5032 print('Warning -- Dangling threads: %s' % threads, 5033 file=sys.stderr) 5034 threads = None 5035 5036 # Sleep 500 ms to give time to child processes to complete. 5037 if need_sleep: 5038 time.sleep(0.5) 5039 multiprocessing.process._cleanup() 5040 test.support.gc_collect() 5041 5042 remote_globs['setUpModule'] = setUpModule 5043 remote_globs['tearDownModule'] = tearDownModule 5044