1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import Queue 7import time 8import sys 9import os 10import gc 11import signal 12import array 13import socket 14import random 15import logging 16import errno 17import weakref 18import test.script_helper 19from test import support 20from StringIO import StringIO 21_multiprocessing = support.import_module('_multiprocessing') 22# import threading after _multiprocessing to raise a more relevant error 23# message: "No module named _multiprocessing". _multiprocessing is not compiled 24# without thread support. 25import threading 26 27# Work around broken sem_open implementations 28support.import_module('multiprocessing.synchronize') 29 30import multiprocessing.dummy 31import multiprocessing.connection 32import multiprocessing.managers 33import multiprocessing.heap 34import multiprocessing.pool 35 36from multiprocessing import util 37 38try: 39 from multiprocessing import reduction 40 HAS_REDUCTION = True 41except ImportError: 42 HAS_REDUCTION = False 43 44try: 45 from multiprocessing.sharedctypes import Value, copy 46 HAS_SHAREDCTYPES = True 47except ImportError: 48 HAS_SHAREDCTYPES = False 49 50try: 51 import msvcrt 52except ImportError: 53 msvcrt = None 54 55# 56# 57# 58 59latin = str 60 61# 62# Constants 63# 64 65LOG_LEVEL = util.SUBWARNING 66#LOG_LEVEL = logging.DEBUG 67 68DELTA = 0.1 69CHECK_TIMINGS = False # making true makes tests take a lot longer 70 # and can sometimes cause some non-serious 71 # failures because some calls block a bit 72 # longer than expected 73if CHECK_TIMINGS: 74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 75else: 76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 77 78HAVE_GETVALUE = not getattr(_multiprocessing, 79 'HAVE_BROKEN_SEM_GETVALUE', False) 80 81WIN32 = (sys.platform == "win32") 82 83try: 84 MAXFD = os.sysconf("SC_OPEN_MAX") 85except: 86 MAXFD = 256 87 88# 89# Some tests require ctypes 90# 91 92try: 93 from ctypes import Structure, c_int, c_double 94except ImportError: 95 Structure = object 96 c_int = c_double = None 97 98 99def check_enough_semaphores(): 100 """Check that the system supports enough semaphores to run the test.""" 101 # minimum number of semaphores available according to POSIX 102 nsems_min = 256 103 try: 104 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 105 except (AttributeError, ValueError): 106 # sysconf not available or setting not available 107 return 108 if nsems == -1 or nsems >= nsems_min: 109 return 110 raise unittest.SkipTest("The OS doesn't support enough semaphores " 111 "to run the test (required: %d)." % nsems_min) 112 113 114# 115# Creates a wrapper for a function which records the time it takes to finish 116# 117 118class TimingWrapper(object): 119 120 def __init__(self, func): 121 self.func = func 122 self.elapsed = None 123 124 def __call__(self, *args, **kwds): 125 t = time.time() 126 try: 127 return self.func(*args, **kwds) 128 finally: 129 self.elapsed = time.time() - t 130 131# 132# Base class for test cases 133# 134 135class BaseTestCase(object): 136 137 ALLOWED_TYPES = ('processes', 'manager', 'threads') 138 139 def assertTimingAlmostEqual(self, a, b): 140 if CHECK_TIMINGS: 141 self.assertAlmostEqual(a, b, 1) 142 143 def assertReturnsIfImplemented(self, value, func, *args): 144 try: 145 res = func(*args) 146 except NotImplementedError: 147 pass 148 else: 149 return self.assertEqual(value, res) 150 151 # For the sanity of Windows users, rather than crashing or freezing in 152 # multiple ways. 153 def __reduce__(self, *args): 154 raise NotImplementedError("shouldn't try to pickle a test case") 155 156 __reduce_ex__ = __reduce__ 157 158# 159# Return the value of a semaphore 160# 161 162def get_value(self): 163 try: 164 return self.get_value() 165 except AttributeError: 166 try: 167 return self._Semaphore__value 168 except AttributeError: 169 try: 170 return self._value 171 except AttributeError: 172 raise NotImplementedError 173 174# 175# Testcases 176# 177 178class DummyCallable(object): 179 def __call__(self, q, c): 180 assert isinstance(c, DummyCallable) 181 q.put(5) 182 183 184class _TestProcess(BaseTestCase): 185 186 ALLOWED_TYPES = ('processes', 'threads') 187 188 def test_current(self): 189 if self.TYPE == 'threads': 190 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 191 192 current = self.current_process() 193 authkey = current.authkey 194 195 self.assertTrue(current.is_alive()) 196 self.assertTrue(not current.daemon) 197 self.assertIsInstance(authkey, bytes) 198 self.assertTrue(len(authkey) > 0) 199 self.assertEqual(current.ident, os.getpid()) 200 self.assertEqual(current.exitcode, None) 201 202 @classmethod 203 def _test(cls, q, *args, **kwds): 204 current = cls.current_process() 205 q.put(args) 206 q.put(kwds) 207 q.put(current.name) 208 if cls.TYPE != 'threads': 209 q.put(bytes(current.authkey)) 210 q.put(current.pid) 211 212 def test_process(self): 213 q = self.Queue(1) 214 e = self.Event() 215 args = (q, 1, 2) 216 kwargs = {'hello':23, 'bye':2.54} 217 name = 'SomeProcess' 218 p = self.Process( 219 target=self._test, args=args, kwargs=kwargs, name=name 220 ) 221 p.daemon = True 222 current = self.current_process() 223 224 if self.TYPE != 'threads': 225 self.assertEqual(p.authkey, current.authkey) 226 self.assertEqual(p.is_alive(), False) 227 self.assertEqual(p.daemon, True) 228 self.assertNotIn(p, self.active_children()) 229 self.assertTrue(type(self.active_children()) is list) 230 self.assertEqual(p.exitcode, None) 231 232 p.start() 233 234 self.assertEqual(p.exitcode, None) 235 self.assertEqual(p.is_alive(), True) 236 self.assertIn(p, self.active_children()) 237 238 self.assertEqual(q.get(), args[1:]) 239 self.assertEqual(q.get(), kwargs) 240 self.assertEqual(q.get(), p.name) 241 if self.TYPE != 'threads': 242 self.assertEqual(q.get(), current.authkey) 243 self.assertEqual(q.get(), p.pid) 244 245 p.join() 246 247 self.assertEqual(p.exitcode, 0) 248 self.assertEqual(p.is_alive(), False) 249 self.assertNotIn(p, self.active_children()) 250 251 @classmethod 252 def _test_terminate(cls): 253 time.sleep(1000) 254 255 def test_terminate(self): 256 if self.TYPE == 'threads': 257 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 258 259 p = self.Process(target=self._test_terminate) 260 p.daemon = True 261 p.start() 262 263 self.assertEqual(p.is_alive(), True) 264 self.assertIn(p, self.active_children()) 265 self.assertEqual(p.exitcode, None) 266 267 p.terminate() 268 269 join = TimingWrapper(p.join) 270 self.assertEqual(join(), None) 271 self.assertTimingAlmostEqual(join.elapsed, 0.0) 272 273 self.assertEqual(p.is_alive(), False) 274 self.assertNotIn(p, self.active_children()) 275 276 p.join() 277 278 # XXX sometimes get p.exitcode == 0 on Windows ... 279 #self.assertEqual(p.exitcode, -signal.SIGTERM) 280 281 def test_cpu_count(self): 282 try: 283 cpus = multiprocessing.cpu_count() 284 except NotImplementedError: 285 cpus = 1 286 self.assertTrue(type(cpus) is int) 287 self.assertTrue(cpus >= 1) 288 289 def test_active_children(self): 290 self.assertEqual(type(self.active_children()), list) 291 292 p = self.Process(target=time.sleep, args=(DELTA,)) 293 self.assertNotIn(p, self.active_children()) 294 295 p.daemon = True 296 p.start() 297 self.assertIn(p, self.active_children()) 298 299 p.join() 300 self.assertNotIn(p, self.active_children()) 301 302 @classmethod 303 def _test_recursion(cls, wconn, id): 304 from multiprocessing import forking 305 wconn.send(id) 306 if len(id) < 2: 307 for i in range(2): 308 p = cls.Process( 309 target=cls._test_recursion, args=(wconn, id+[i]) 310 ) 311 p.start() 312 p.join() 313 314 def test_recursion(self): 315 rconn, wconn = self.Pipe(duplex=False) 316 self._test_recursion(wconn, []) 317 318 time.sleep(DELTA) 319 result = [] 320 while rconn.poll(): 321 result.append(rconn.recv()) 322 323 expected = [ 324 [], 325 [0], 326 [0, 0], 327 [0, 1], 328 [1], 329 [1, 0], 330 [1, 1] 331 ] 332 self.assertEqual(result, expected) 333 334 @classmethod 335 def _test_sys_exit(cls, reason, testfn): 336 sys.stderr = open(testfn, 'w') 337 sys.exit(reason) 338 339 def test_sys_exit(self): 340 # See Issue 13854 341 if self.TYPE == 'threads': 342 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 343 344 testfn = support.TESTFN 345 self.addCleanup(support.unlink, testfn) 346 347 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)): 348 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 349 p.daemon = True 350 p.start() 351 p.join(5) 352 self.assertEqual(p.exitcode, code) 353 354 with open(testfn, 'r') as f: 355 self.assertEqual(f.read().rstrip(), str(reason)) 356 357 for reason in (True, False, 8): 358 p = self.Process(target=sys.exit, args=(reason,)) 359 p.daemon = True 360 p.start() 361 p.join(5) 362 self.assertEqual(p.exitcode, reason) 363 364 def test_lose_target_ref(self): 365 c = DummyCallable() 366 wr = weakref.ref(c) 367 q = self.Queue() 368 p = self.Process(target=c, args=(q, c)) 369 del c 370 p.start() 371 p.join() 372 self.assertIs(wr(), None) 373 self.assertEqual(q.get(), 5) 374 375 376# 377# 378# 379 380class _UpperCaser(multiprocessing.Process): 381 382 def __init__(self): 383 multiprocessing.Process.__init__(self) 384 self.child_conn, self.parent_conn = multiprocessing.Pipe() 385 386 def run(self): 387 self.parent_conn.close() 388 for s in iter(self.child_conn.recv, None): 389 self.child_conn.send(s.upper()) 390 self.child_conn.close() 391 392 def submit(self, s): 393 assert type(s) is str 394 self.parent_conn.send(s) 395 return self.parent_conn.recv() 396 397 def stop(self): 398 self.parent_conn.send(None) 399 self.parent_conn.close() 400 self.child_conn.close() 401 402class _TestSubclassingProcess(BaseTestCase): 403 404 ALLOWED_TYPES = ('processes',) 405 406 def test_subclassing(self): 407 uppercaser = _UpperCaser() 408 uppercaser.daemon = True 409 uppercaser.start() 410 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 411 self.assertEqual(uppercaser.submit('world'), 'WORLD') 412 uppercaser.stop() 413 uppercaser.join() 414 415# 416# 417# 418 419def queue_empty(q): 420 if hasattr(q, 'empty'): 421 return q.empty() 422 else: 423 return q.qsize() == 0 424 425def queue_full(q, maxsize): 426 if hasattr(q, 'full'): 427 return q.full() 428 else: 429 return q.qsize() == maxsize 430 431 432class _TestQueue(BaseTestCase): 433 434 435 @classmethod 436 def _test_put(cls, queue, child_can_start, parent_can_continue): 437 child_can_start.wait() 438 for i in range(6): 439 queue.get() 440 parent_can_continue.set() 441 442 def test_put(self): 443 MAXSIZE = 6 444 queue = self.Queue(maxsize=MAXSIZE) 445 child_can_start = self.Event() 446 parent_can_continue = self.Event() 447 448 proc = self.Process( 449 target=self._test_put, 450 args=(queue, child_can_start, parent_can_continue) 451 ) 452 proc.daemon = True 453 proc.start() 454 455 self.assertEqual(queue_empty(queue), True) 456 self.assertEqual(queue_full(queue, MAXSIZE), False) 457 458 queue.put(1) 459 queue.put(2, True) 460 queue.put(3, True, None) 461 queue.put(4, False) 462 queue.put(5, False, None) 463 queue.put_nowait(6) 464 465 # the values may be in buffer but not yet in pipe so sleep a bit 466 time.sleep(DELTA) 467 468 self.assertEqual(queue_empty(queue), False) 469 self.assertEqual(queue_full(queue, MAXSIZE), True) 470 471 put = TimingWrapper(queue.put) 472 put_nowait = TimingWrapper(queue.put_nowait) 473 474 self.assertRaises(Queue.Full, put, 7, False) 475 self.assertTimingAlmostEqual(put.elapsed, 0) 476 477 self.assertRaises(Queue.Full, put, 7, False, None) 478 self.assertTimingAlmostEqual(put.elapsed, 0) 479 480 self.assertRaises(Queue.Full, put_nowait, 7) 481 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 482 483 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1) 484 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 485 486 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2) 487 self.assertTimingAlmostEqual(put.elapsed, 0) 488 489 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3) 490 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 491 492 child_can_start.set() 493 parent_can_continue.wait() 494 495 self.assertEqual(queue_empty(queue), True) 496 self.assertEqual(queue_full(queue, MAXSIZE), False) 497 498 proc.join() 499 500 @classmethod 501 def _test_get(cls, queue, child_can_start, parent_can_continue): 502 child_can_start.wait() 503 #queue.put(1) 504 queue.put(2) 505 queue.put(3) 506 queue.put(4) 507 queue.put(5) 508 parent_can_continue.set() 509 510 def test_get(self): 511 queue = self.Queue() 512 child_can_start = self.Event() 513 parent_can_continue = self.Event() 514 515 proc = self.Process( 516 target=self._test_get, 517 args=(queue, child_can_start, parent_can_continue) 518 ) 519 proc.daemon = True 520 proc.start() 521 522 self.assertEqual(queue_empty(queue), True) 523 524 child_can_start.set() 525 parent_can_continue.wait() 526 527 time.sleep(DELTA) 528 self.assertEqual(queue_empty(queue), False) 529 530 # Hangs unexpectedly, remove for now 531 #self.assertEqual(queue.get(), 1) 532 self.assertEqual(queue.get(True, None), 2) 533 self.assertEqual(queue.get(True), 3) 534 self.assertEqual(queue.get(timeout=1), 4) 535 self.assertEqual(queue.get_nowait(), 5) 536 537 self.assertEqual(queue_empty(queue), True) 538 539 get = TimingWrapper(queue.get) 540 get_nowait = TimingWrapper(queue.get_nowait) 541 542 self.assertRaises(Queue.Empty, get, False) 543 self.assertTimingAlmostEqual(get.elapsed, 0) 544 545 self.assertRaises(Queue.Empty, get, False, None) 546 self.assertTimingAlmostEqual(get.elapsed, 0) 547 548 self.assertRaises(Queue.Empty, get_nowait) 549 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 550 551 self.assertRaises(Queue.Empty, get, True, TIMEOUT1) 552 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 553 554 self.assertRaises(Queue.Empty, get, False, TIMEOUT2) 555 self.assertTimingAlmostEqual(get.elapsed, 0) 556 557 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3) 558 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 559 560 proc.join() 561 562 @classmethod 563 def _test_fork(cls, queue): 564 for i in range(10, 20): 565 queue.put(i) 566 # note that at this point the items may only be buffered, so the 567 # process cannot shutdown until the feeder thread has finished 568 # pushing items onto the pipe. 569 570 def test_fork(self): 571 # Old versions of Queue would fail to create a new feeder 572 # thread for a forked process if the original process had its 573 # own feeder thread. This test checks that this no longer 574 # happens. 575 576 queue = self.Queue() 577 578 # put items on queue so that main process starts a feeder thread 579 for i in range(10): 580 queue.put(i) 581 582 # wait to make sure thread starts before we fork a new process 583 time.sleep(DELTA) 584 585 # fork process 586 p = self.Process(target=self._test_fork, args=(queue,)) 587 p.daemon = True 588 p.start() 589 590 # check that all expected items are in the queue 591 for i in range(20): 592 self.assertEqual(queue.get(), i) 593 self.assertRaises(Queue.Empty, queue.get, False) 594 595 p.join() 596 597 def test_qsize(self): 598 q = self.Queue() 599 try: 600 self.assertEqual(q.qsize(), 0) 601 except NotImplementedError: 602 self.skipTest('qsize method not implemented') 603 q.put(1) 604 self.assertEqual(q.qsize(), 1) 605 q.put(5) 606 self.assertEqual(q.qsize(), 2) 607 q.get() 608 self.assertEqual(q.qsize(), 1) 609 q.get() 610 self.assertEqual(q.qsize(), 0) 611 612 @classmethod 613 def _test_task_done(cls, q): 614 for obj in iter(q.get, None): 615 time.sleep(DELTA) 616 q.task_done() 617 618 def test_task_done(self): 619 queue = self.JoinableQueue() 620 621 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): 622 self.skipTest("requires 'queue.task_done()' method") 623 624 workers = [self.Process(target=self._test_task_done, args=(queue,)) 625 for i in xrange(4)] 626 627 for p in workers: 628 p.daemon = True 629 p.start() 630 631 for i in xrange(10): 632 queue.put(i) 633 634 queue.join() 635 636 for p in workers: 637 queue.put(None) 638 639 for p in workers: 640 p.join() 641 642 def test_no_import_lock_contention(self): 643 with support.temp_cwd(): 644 module_name = 'imported_by_an_imported_module' 645 with open(module_name + '.py', 'w') as f: 646 f.write("""if 1: 647 import multiprocessing 648 649 q = multiprocessing.Queue() 650 q.put('knock knock') 651 q.get(timeout=3) 652 q.close() 653 """) 654 655 with support.DirsOnSysPath(os.getcwd()): 656 try: 657 __import__(module_name) 658 except Queue.Empty: 659 self.fail("Probable regression on import lock contention;" 660 " see Issue #22853") 661 662 def test_queue_feeder_donot_stop_onexc(self): 663 # bpo-30414: verify feeder handles exceptions correctly 664 if self.TYPE != 'processes': 665 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 666 667 class NotSerializable(object): 668 def __reduce__(self): 669 raise AttributeError 670 with test.support.captured_stderr(): 671 q = self.Queue() 672 q.put(NotSerializable()) 673 q.put(True) 674 # bpo-30595: use a timeout of 1 second for slow buildbots 675 self.assertTrue(q.get(timeout=1.0)) 676 677 678# 679# 680# 681 682class _TestLock(BaseTestCase): 683 684 def test_lock(self): 685 lock = self.Lock() 686 self.assertEqual(lock.acquire(), True) 687 self.assertEqual(lock.acquire(False), False) 688 self.assertEqual(lock.release(), None) 689 self.assertRaises((ValueError, threading.ThreadError), lock.release) 690 691 def test_rlock(self): 692 lock = self.RLock() 693 self.assertEqual(lock.acquire(), True) 694 self.assertEqual(lock.acquire(), True) 695 self.assertEqual(lock.acquire(), True) 696 self.assertEqual(lock.release(), None) 697 self.assertEqual(lock.release(), None) 698 self.assertEqual(lock.release(), None) 699 self.assertRaises((AssertionError, RuntimeError), lock.release) 700 701 def test_lock_context(self): 702 with self.Lock(): 703 pass 704 705 706class _TestSemaphore(BaseTestCase): 707 708 def _test_semaphore(self, sem): 709 self.assertReturnsIfImplemented(2, get_value, sem) 710 self.assertEqual(sem.acquire(), True) 711 self.assertReturnsIfImplemented(1, get_value, sem) 712 self.assertEqual(sem.acquire(), True) 713 self.assertReturnsIfImplemented(0, get_value, sem) 714 self.assertEqual(sem.acquire(False), False) 715 self.assertReturnsIfImplemented(0, get_value, sem) 716 self.assertEqual(sem.release(), None) 717 self.assertReturnsIfImplemented(1, get_value, sem) 718 self.assertEqual(sem.release(), None) 719 self.assertReturnsIfImplemented(2, get_value, sem) 720 721 def test_semaphore(self): 722 sem = self.Semaphore(2) 723 self._test_semaphore(sem) 724 self.assertEqual(sem.release(), None) 725 self.assertReturnsIfImplemented(3, get_value, sem) 726 self.assertEqual(sem.release(), None) 727 self.assertReturnsIfImplemented(4, get_value, sem) 728 729 def test_bounded_semaphore(self): 730 sem = self.BoundedSemaphore(2) 731 self._test_semaphore(sem) 732 # Currently fails on OS/X 733 #if HAVE_GETVALUE: 734 # self.assertRaises(ValueError, sem.release) 735 # self.assertReturnsIfImplemented(2, get_value, sem) 736 737 def test_timeout(self): 738 if self.TYPE != 'processes': 739 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 740 741 sem = self.Semaphore(0) 742 acquire = TimingWrapper(sem.acquire) 743 744 self.assertEqual(acquire(False), False) 745 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 746 747 self.assertEqual(acquire(False, None), False) 748 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 749 750 self.assertEqual(acquire(False, TIMEOUT1), False) 751 self.assertTimingAlmostEqual(acquire.elapsed, 0) 752 753 self.assertEqual(acquire(True, TIMEOUT2), False) 754 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 755 756 self.assertEqual(acquire(timeout=TIMEOUT3), False) 757 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 758 759 760class _TestCondition(BaseTestCase): 761 762 @classmethod 763 def f(cls, cond, sleeping, woken, timeout=None): 764 cond.acquire() 765 sleeping.release() 766 cond.wait(timeout) 767 woken.release() 768 cond.release() 769 770 def check_invariant(self, cond): 771 # this is only supposed to succeed when there are no sleepers 772 if self.TYPE == 'processes': 773 try: 774 sleepers = (cond._sleeping_count.get_value() - 775 cond._woken_count.get_value()) 776 self.assertEqual(sleepers, 0) 777 self.assertEqual(cond._wait_semaphore.get_value(), 0) 778 except NotImplementedError: 779 pass 780 781 def test_notify(self): 782 cond = self.Condition() 783 sleeping = self.Semaphore(0) 784 woken = self.Semaphore(0) 785 786 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 787 p.daemon = True 788 p.start() 789 790 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 791 p.daemon = True 792 p.start() 793 794 # wait for both children to start sleeping 795 sleeping.acquire() 796 sleeping.acquire() 797 798 # check no process/thread has woken up 799 time.sleep(DELTA) 800 self.assertReturnsIfImplemented(0, get_value, woken) 801 802 # wake up one process/thread 803 cond.acquire() 804 cond.notify() 805 cond.release() 806 807 # check one process/thread has woken up 808 time.sleep(DELTA) 809 self.assertReturnsIfImplemented(1, get_value, woken) 810 811 # wake up another 812 cond.acquire() 813 cond.notify() 814 cond.release() 815 816 # check other has woken up 817 time.sleep(DELTA) 818 self.assertReturnsIfImplemented(2, get_value, woken) 819 820 # check state is not mucked up 821 self.check_invariant(cond) 822 p.join() 823 824 def test_notify_all(self): 825 cond = self.Condition() 826 sleeping = self.Semaphore(0) 827 woken = self.Semaphore(0) 828 829 # start some threads/processes which will timeout 830 for i in range(3): 831 p = self.Process(target=self.f, 832 args=(cond, sleeping, woken, TIMEOUT1)) 833 p.daemon = True 834 p.start() 835 836 t = threading.Thread(target=self.f, 837 args=(cond, sleeping, woken, TIMEOUT1)) 838 t.daemon = True 839 t.start() 840 841 # wait for them all to sleep 842 for i in xrange(6): 843 sleeping.acquire() 844 845 # check they have all timed out 846 for i in xrange(6): 847 woken.acquire() 848 self.assertReturnsIfImplemented(0, get_value, woken) 849 850 # check state is not mucked up 851 self.check_invariant(cond) 852 853 # start some more threads/processes 854 for i in range(3): 855 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 856 p.daemon = True 857 p.start() 858 859 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 860 t.daemon = True 861 t.start() 862 863 # wait for them to all sleep 864 for i in xrange(6): 865 sleeping.acquire() 866 867 # check no process/thread has woken up 868 time.sleep(DELTA) 869 self.assertReturnsIfImplemented(0, get_value, woken) 870 871 # wake them all up 872 cond.acquire() 873 cond.notify_all() 874 cond.release() 875 876 # check they have all woken 877 for i in range(10): 878 try: 879 if get_value(woken) == 6: 880 break 881 except NotImplementedError: 882 break 883 time.sleep(DELTA) 884 self.assertReturnsIfImplemented(6, get_value, woken) 885 886 # check state is not mucked up 887 self.check_invariant(cond) 888 889 def test_timeout(self): 890 cond = self.Condition() 891 wait = TimingWrapper(cond.wait) 892 cond.acquire() 893 res = wait(TIMEOUT1) 894 cond.release() 895 self.assertEqual(res, None) 896 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 897 898 899class _TestEvent(BaseTestCase): 900 901 @classmethod 902 def _test_event(cls, event): 903 time.sleep(TIMEOUT2) 904 event.set() 905 906 def test_event(self): 907 event = self.Event() 908 wait = TimingWrapper(event.wait) 909 910 # Removed temporarily, due to API shear, this does not 911 # work with threading._Event objects. is_set == isSet 912 self.assertEqual(event.is_set(), False) 913 914 # Removed, threading.Event.wait() will return the value of the __flag 915 # instead of None. API Shear with the semaphore backed mp.Event 916 self.assertEqual(wait(0.0), False) 917 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 918 self.assertEqual(wait(TIMEOUT1), False) 919 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 920 921 event.set() 922 923 # See note above on the API differences 924 self.assertEqual(event.is_set(), True) 925 self.assertEqual(wait(), True) 926 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 927 self.assertEqual(wait(TIMEOUT1), True) 928 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 929 # self.assertEqual(event.is_set(), True) 930 931 event.clear() 932 933 #self.assertEqual(event.is_set(), False) 934 935 p = self.Process(target=self._test_event, args=(event,)) 936 p.daemon = True 937 p.start() 938 self.assertEqual(wait(), True) 939 940# 941# 942# 943 944class _TestValue(BaseTestCase): 945 946 ALLOWED_TYPES = ('processes',) 947 948 codes_values = [ 949 ('i', 4343, 24234), 950 ('d', 3.625, -4.25), 951 ('h', -232, 234), 952 ('c', latin('x'), latin('y')) 953 ] 954 955 def setUp(self): 956 if not HAS_SHAREDCTYPES: 957 self.skipTest("requires multiprocessing.sharedctypes") 958 959 @classmethod 960 def _test(cls, values): 961 for sv, cv in zip(values, cls.codes_values): 962 sv.value = cv[2] 963 964 965 def test_value(self, raw=False): 966 if raw: 967 values = [self.RawValue(code, value) 968 for code, value, _ in self.codes_values] 969 else: 970 values = [self.Value(code, value) 971 for code, value, _ in self.codes_values] 972 973 for sv, cv in zip(values, self.codes_values): 974 self.assertEqual(sv.value, cv[1]) 975 976 proc = self.Process(target=self._test, args=(values,)) 977 proc.daemon = True 978 proc.start() 979 proc.join() 980 981 for sv, cv in zip(values, self.codes_values): 982 self.assertEqual(sv.value, cv[2]) 983 984 def test_rawvalue(self): 985 self.test_value(raw=True) 986 987 def test_getobj_getlock(self): 988 val1 = self.Value('i', 5) 989 lock1 = val1.get_lock() 990 obj1 = val1.get_obj() 991 992 val2 = self.Value('i', 5, lock=None) 993 lock2 = val2.get_lock() 994 obj2 = val2.get_obj() 995 996 lock = self.Lock() 997 val3 = self.Value('i', 5, lock=lock) 998 lock3 = val3.get_lock() 999 obj3 = val3.get_obj() 1000 self.assertEqual(lock, lock3) 1001 1002 arr4 = self.Value('i', 5, lock=False) 1003 self.assertFalse(hasattr(arr4, 'get_lock')) 1004 self.assertFalse(hasattr(arr4, 'get_obj')) 1005 1006 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 1007 1008 arr5 = self.RawValue('i', 5) 1009 self.assertFalse(hasattr(arr5, 'get_lock')) 1010 self.assertFalse(hasattr(arr5, 'get_obj')) 1011 1012 1013class _TestArray(BaseTestCase): 1014 1015 ALLOWED_TYPES = ('processes',) 1016 1017 @classmethod 1018 def f(cls, seq): 1019 for i in range(1, len(seq)): 1020 seq[i] += seq[i-1] 1021 1022 @unittest.skipIf(c_int is None, "requires _ctypes") 1023 def test_array(self, raw=False): 1024 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 1025 if raw: 1026 arr = self.RawArray('i', seq) 1027 else: 1028 arr = self.Array('i', seq) 1029 1030 self.assertEqual(len(arr), len(seq)) 1031 self.assertEqual(arr[3], seq[3]) 1032 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 1033 1034 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 1035 1036 self.assertEqual(list(arr[:]), seq) 1037 1038 self.f(seq) 1039 1040 p = self.Process(target=self.f, args=(arr,)) 1041 p.daemon = True 1042 p.start() 1043 p.join() 1044 1045 self.assertEqual(list(arr[:]), seq) 1046 1047 @unittest.skipIf(c_int is None, "requires _ctypes") 1048 def test_array_from_size(self): 1049 size = 10 1050 # Test for zeroing (see issue #11675). 1051 # The repetition below strengthens the test by increasing the chances 1052 # of previously allocated non-zero memory being used for the new array 1053 # on the 2nd and 3rd loops. 1054 for _ in range(3): 1055 arr = self.Array('i', size) 1056 self.assertEqual(len(arr), size) 1057 self.assertEqual(list(arr), [0] * size) 1058 arr[:] = range(10) 1059 self.assertEqual(list(arr), range(10)) 1060 del arr 1061 1062 @unittest.skipIf(c_int is None, "requires _ctypes") 1063 def test_rawarray(self): 1064 self.test_array(raw=True) 1065 1066 @unittest.skipIf(c_int is None, "requires _ctypes") 1067 def test_array_accepts_long(self): 1068 arr = self.Array('i', 10L) 1069 self.assertEqual(len(arr), 10) 1070 raw_arr = self.RawArray('i', 10L) 1071 self.assertEqual(len(raw_arr), 10) 1072 1073 @unittest.skipIf(c_int is None, "requires _ctypes") 1074 def test_getobj_getlock_obj(self): 1075 arr1 = self.Array('i', range(10)) 1076 lock1 = arr1.get_lock() 1077 obj1 = arr1.get_obj() 1078 1079 arr2 = self.Array('i', range(10), lock=None) 1080 lock2 = arr2.get_lock() 1081 obj2 = arr2.get_obj() 1082 1083 lock = self.Lock() 1084 arr3 = self.Array('i', range(10), lock=lock) 1085 lock3 = arr3.get_lock() 1086 obj3 = arr3.get_obj() 1087 self.assertEqual(lock, lock3) 1088 1089 arr4 = self.Array('i', range(10), lock=False) 1090 self.assertFalse(hasattr(arr4, 'get_lock')) 1091 self.assertFalse(hasattr(arr4, 'get_obj')) 1092 self.assertRaises(AttributeError, 1093 self.Array, 'i', range(10), lock='notalock') 1094 1095 arr5 = self.RawArray('i', range(10)) 1096 self.assertFalse(hasattr(arr5, 'get_lock')) 1097 self.assertFalse(hasattr(arr5, 'get_obj')) 1098 1099# 1100# 1101# 1102 1103class _TestContainers(BaseTestCase): 1104 1105 ALLOWED_TYPES = ('manager',) 1106 1107 def test_list(self): 1108 a = self.list(range(10)) 1109 self.assertEqual(a[:], range(10)) 1110 1111 b = self.list() 1112 self.assertEqual(b[:], []) 1113 1114 b.extend(range(5)) 1115 self.assertEqual(b[:], range(5)) 1116 1117 self.assertEqual(b[2], 2) 1118 self.assertEqual(b[2:10], [2,3,4]) 1119 1120 b *= 2 1121 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 1122 1123 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 1124 1125 self.assertEqual(a[:], range(10)) 1126 1127 d = [a, b] 1128 e = self.list(d) 1129 self.assertEqual( 1130 e[:], 1131 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 1132 ) 1133 1134 f = self.list([a]) 1135 a.append('hello') 1136 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) 1137 1138 def test_dict(self): 1139 d = self.dict() 1140 indices = range(65, 70) 1141 for i in indices: 1142 d[i] = chr(i) 1143 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 1144 self.assertEqual(sorted(d.keys()), indices) 1145 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 1146 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 1147 1148 def test_namespace(self): 1149 n = self.Namespace() 1150 n.name = 'Bob' 1151 n.job = 'Builder' 1152 n._hidden = 'hidden' 1153 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 1154 del n.job 1155 self.assertEqual(str(n), "Namespace(name='Bob')") 1156 self.assertTrue(hasattr(n, 'name')) 1157 self.assertTrue(not hasattr(n, 'job')) 1158 1159# 1160# 1161# 1162 1163def sqr(x, wait=0.0): 1164 time.sleep(wait) 1165 return x*x 1166 1167def identity(x): 1168 return x 1169 1170class CountedObject(object): 1171 n_instances = 0 1172 1173 def __new__(cls): 1174 cls.n_instances += 1 1175 return object.__new__(cls) 1176 1177 def __del__(self): 1178 type(self).n_instances -= 1 1179 1180class SayWhenError(ValueError): pass 1181 1182def exception_throwing_generator(total, when): 1183 for i in range(total): 1184 if i == when: 1185 raise SayWhenError("Somebody said when") 1186 yield i 1187 1188class _TestPool(BaseTestCase): 1189 1190 def test_apply(self): 1191 papply = self.pool.apply 1192 self.assertEqual(papply(sqr, (5,)), sqr(5)) 1193 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 1194 1195 def test_map(self): 1196 pmap = self.pool.map 1197 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) 1198 self.assertEqual(pmap(sqr, range(100), chunksize=20), 1199 map(sqr, range(100))) 1200 1201 def test_map_unplicklable(self): 1202 # Issue #19425 -- failure to pickle should not cause a hang 1203 if self.TYPE == 'threads': 1204 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1205 class A(object): 1206 def __reduce__(self): 1207 raise RuntimeError('cannot pickle') 1208 with self.assertRaises(RuntimeError): 1209 self.pool.map(sqr, [A()]*10) 1210 1211 def test_map_chunksize(self): 1212 try: 1213 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1214 except multiprocessing.TimeoutError: 1215 self.fail("pool.map_async with chunksize stalled on null list") 1216 1217 def test_async(self): 1218 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 1219 get = TimingWrapper(res.get) 1220 self.assertEqual(get(), 49) 1221 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1222 1223 def test_async_timeout(self): 1224 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 1225 get = TimingWrapper(res.get) 1226 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 1227 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 1228 1229 def test_imap(self): 1230 it = self.pool.imap(sqr, range(10)) 1231 self.assertEqual(list(it), map(sqr, range(10))) 1232 1233 it = self.pool.imap(sqr, range(10)) 1234 for i in range(10): 1235 self.assertEqual(it.next(), i*i) 1236 self.assertRaises(StopIteration, it.next) 1237 1238 it = self.pool.imap(sqr, range(1000), chunksize=100) 1239 for i in range(1000): 1240 self.assertEqual(it.next(), i*i) 1241 self.assertRaises(StopIteration, it.next) 1242 1243 def test_imap_handle_iterable_exception(self): 1244 if self.TYPE == 'manager': 1245 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1246 1247 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 1248 for i in range(3): 1249 self.assertEqual(next(it), i*i) 1250 self.assertRaises(SayWhenError, it.next) 1251 1252 # SayWhenError seen at start of problematic chunk's results 1253 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 1254 for i in range(6): 1255 self.assertEqual(next(it), i*i) 1256 self.assertRaises(SayWhenError, it.next) 1257 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 1258 for i in range(4): 1259 self.assertEqual(next(it), i*i) 1260 self.assertRaises(SayWhenError, it.next) 1261 1262 def test_imap_unordered(self): 1263 it = self.pool.imap_unordered(sqr, range(100)) 1264 self.assertEqual(sorted(it), map(sqr, range(100))) 1265 1266 it = self.pool.imap_unordered(sqr, range(1000), chunksize=100) 1267 self.assertEqual(sorted(it), map(sqr, range(1000))) 1268 1269 def test_imap_unordered_handle_iterable_exception(self): 1270 if self.TYPE == 'manager': 1271 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1272 1273 it = self.pool.imap_unordered(sqr, 1274 exception_throwing_generator(10, 3), 1275 1) 1276 expected_values = map(sqr, range(10)) 1277 with self.assertRaises(SayWhenError): 1278 # imap_unordered makes it difficult to anticipate the SayWhenError 1279 for i in range(10): 1280 value = next(it) 1281 self.assertIn(value, expected_values) 1282 expected_values.remove(value) 1283 1284 it = self.pool.imap_unordered(sqr, 1285 exception_throwing_generator(20, 7), 1286 2) 1287 expected_values = map(sqr, range(20)) 1288 with self.assertRaises(SayWhenError): 1289 for i in range(20): 1290 value = next(it) 1291 self.assertIn(value, expected_values) 1292 expected_values.remove(value) 1293 1294 def test_make_pool(self): 1295 self.assertRaises(ValueError, multiprocessing.Pool, -1) 1296 self.assertRaises(ValueError, multiprocessing.Pool, 0) 1297 1298 p = multiprocessing.Pool(3) 1299 self.assertEqual(3, len(p._pool)) 1300 p.close() 1301 p.join() 1302 1303 def test_terminate(self): 1304 p = self.Pool(4) 1305 result = p.map_async( 1306 time.sleep, [0.1 for i in range(10000)], chunksize=1 1307 ) 1308 p.terminate() 1309 join = TimingWrapper(p.join) 1310 join() 1311 self.assertTrue(join.elapsed < 0.2) 1312 1313 def test_empty_iterable(self): 1314 # See Issue 12157 1315 p = self.Pool(1) 1316 1317 self.assertEqual(p.map(sqr, []), []) 1318 self.assertEqual(list(p.imap(sqr, [])), []) 1319 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 1320 self.assertEqual(p.map_async(sqr, []).get(), []) 1321 1322 p.close() 1323 p.join() 1324 1325 def test_release_task_refs(self): 1326 # Issue #29861: task arguments and results should not be kept 1327 # alive after we are done with them. 1328 objs = list(CountedObject() for i in range(10)) 1329 refs = list(weakref.ref(o) for o in objs) 1330 self.pool.map(identity, objs) 1331 1332 del objs 1333 time.sleep(DELTA) # let threaded cleanup code run 1334 self.assertEqual(set(wr() for wr in refs), {None}) 1335 # With a process pool, copies of the objects are returned, check 1336 # they were released too. 1337 self.assertEqual(CountedObject.n_instances, 0) 1338 1339 1340def unpickleable_result(): 1341 return lambda: 42 1342 1343class _TestPoolWorkerErrors(BaseTestCase): 1344 ALLOWED_TYPES = ('processes', ) 1345 1346 def test_unpickleable_result(self): 1347 from multiprocessing.pool import MaybeEncodingError 1348 p = multiprocessing.Pool(2) 1349 1350 # Make sure we don't lose pool processes because of encoding errors. 1351 for iteration in range(20): 1352 res = p.apply_async(unpickleable_result) 1353 self.assertRaises(MaybeEncodingError, res.get) 1354 1355 p.close() 1356 p.join() 1357 1358class _TestPoolWorkerLifetime(BaseTestCase): 1359 1360 ALLOWED_TYPES = ('processes', ) 1361 def test_pool_worker_lifetime(self): 1362 p = multiprocessing.Pool(3, maxtasksperchild=10) 1363 self.assertEqual(3, len(p._pool)) 1364 origworkerpids = [w.pid for w in p._pool] 1365 # Run many tasks so each worker gets replaced (hopefully) 1366 results = [] 1367 for i in range(100): 1368 results.append(p.apply_async(sqr, (i, ))) 1369 # Fetch the results and verify we got the right answers, 1370 # also ensuring all the tasks have completed. 1371 for (j, res) in enumerate(results): 1372 self.assertEqual(res.get(), sqr(j)) 1373 # Refill the pool 1374 p._repopulate_pool() 1375 # Wait until all workers are alive 1376 # (countdown * DELTA = 5 seconds max startup process time) 1377 countdown = 50 1378 while countdown and not all(w.is_alive() for w in p._pool): 1379 countdown -= 1 1380 time.sleep(DELTA) 1381 finalworkerpids = [w.pid for w in p._pool] 1382 # All pids should be assigned. See issue #7805. 1383 self.assertNotIn(None, origworkerpids) 1384 self.assertNotIn(None, finalworkerpids) 1385 # Finally, check that the worker pids have changed 1386 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 1387 p.close() 1388 p.join() 1389 1390 def test_pool_worker_lifetime_early_close(self): 1391 # Issue #10332: closing a pool whose workers have limited lifetimes 1392 # before all the tasks completed would make join() hang. 1393 p = multiprocessing.Pool(3, maxtasksperchild=1) 1394 results = [] 1395 for i in range(6): 1396 results.append(p.apply_async(sqr, (i, 0.3))) 1397 p.close() 1398 p.join() 1399 # check the results 1400 for (j, res) in enumerate(results): 1401 self.assertEqual(res.get(), sqr(j)) 1402 1403 1404# 1405# Test that manager has expected number of shared objects left 1406# 1407 1408class _TestZZZNumberOfObjects(BaseTestCase): 1409 # Because test cases are sorted alphabetically, this one will get 1410 # run after all the other tests for the manager. It tests that 1411 # there have been no "reference leaks" for the manager's shared 1412 # objects. Note the comment in _TestPool.test_terminate(). 1413 ALLOWED_TYPES = ('manager',) 1414 1415 def test_number_of_objects(self): 1416 EXPECTED_NUMBER = 1 # the pool object is still alive 1417 multiprocessing.active_children() # discard dead process objs 1418 gc.collect() # do garbage collection 1419 refs = self.manager._number_of_objects() 1420 debug_info = self.manager._debug_info() 1421 if refs != EXPECTED_NUMBER: 1422 print self.manager._debug_info() 1423 print debug_info 1424 1425 self.assertEqual(refs, EXPECTED_NUMBER) 1426 1427# 1428# Test of creating a customized manager class 1429# 1430 1431from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 1432 1433class FooBar(object): 1434 def f(self): 1435 return 'f()' 1436 def g(self): 1437 raise ValueError 1438 def _h(self): 1439 return '_h()' 1440 1441def baz(): 1442 for i in xrange(10): 1443 yield i*i 1444 1445class IteratorProxy(BaseProxy): 1446 _exposed_ = ('next', '__next__') 1447 def __iter__(self): 1448 return self 1449 def next(self): 1450 return self._callmethod('next') 1451 def __next__(self): 1452 return self._callmethod('__next__') 1453 1454class MyManager(BaseManager): 1455 pass 1456 1457MyManager.register('Foo', callable=FooBar) 1458MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 1459MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 1460 1461 1462class _TestMyManager(BaseTestCase): 1463 1464 ALLOWED_TYPES = ('manager',) 1465 1466 def test_mymanager(self): 1467 manager = MyManager() 1468 manager.start() 1469 1470 foo = manager.Foo() 1471 bar = manager.Bar() 1472 baz = manager.baz() 1473 1474 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 1475 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 1476 1477 self.assertEqual(foo_methods, ['f', 'g']) 1478 self.assertEqual(bar_methods, ['f', '_h']) 1479 1480 self.assertEqual(foo.f(), 'f()') 1481 self.assertRaises(ValueError, foo.g) 1482 self.assertEqual(foo._callmethod('f'), 'f()') 1483 self.assertRaises(RemoteError, foo._callmethod, '_h') 1484 1485 self.assertEqual(bar.f(), 'f()') 1486 self.assertEqual(bar._h(), '_h()') 1487 self.assertEqual(bar._callmethod('f'), 'f()') 1488 self.assertEqual(bar._callmethod('_h'), '_h()') 1489 1490 self.assertEqual(list(baz), [i*i for i in range(10)]) 1491 1492 manager.shutdown() 1493 1494# 1495# Test of connecting to a remote server and using xmlrpclib for serialization 1496# 1497 1498_queue = Queue.Queue() 1499def get_queue(): 1500 return _queue 1501 1502class QueueManager(BaseManager): 1503 '''manager class used by server process''' 1504QueueManager.register('get_queue', callable=get_queue) 1505 1506class QueueManager2(BaseManager): 1507 '''manager class which specifies the same interface as QueueManager''' 1508QueueManager2.register('get_queue') 1509 1510 1511SERIALIZER = 'xmlrpclib' 1512 1513class _TestRemoteManager(BaseTestCase): 1514 1515 ALLOWED_TYPES = ('manager',) 1516 values = ['hello world', None, True, 2.25, 1517 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8 1518 ] 1519 result = values[:] 1520 if support.have_unicode: 1521 #result[-1] = u'hall\xe5 v\xe4rlden' 1522 uvalue = support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 ' 1523 r'\u0441\u0432\u0456\u0442') 1524 values.append(uvalue) 1525 result.append(uvalue) 1526 1527 @classmethod 1528 def _putter(cls, address, authkey): 1529 manager = QueueManager2( 1530 address=address, authkey=authkey, serializer=SERIALIZER 1531 ) 1532 manager.connect() 1533 queue = manager.get_queue() 1534 # Note that xmlrpclib will deserialize object as a list not a tuple 1535 queue.put(tuple(cls.values)) 1536 1537 def test_remote(self): 1538 authkey = os.urandom(32) 1539 1540 manager = QueueManager( 1541 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 1542 ) 1543 manager.start() 1544 1545 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1546 p.daemon = True 1547 p.start() 1548 1549 manager2 = QueueManager2( 1550 address=manager.address, authkey=authkey, serializer=SERIALIZER 1551 ) 1552 manager2.connect() 1553 queue = manager2.get_queue() 1554 1555 self.assertEqual(queue.get(), self.result) 1556 1557 # Because we are using xmlrpclib for serialization instead of 1558 # pickle this will cause a serialization error. 1559 self.assertRaises(Exception, queue.put, time.sleep) 1560 1561 # Make queue finalizer run before the server is stopped 1562 del queue 1563 manager.shutdown() 1564 1565class _TestManagerRestart(BaseTestCase): 1566 1567 @classmethod 1568 def _putter(cls, address, authkey): 1569 manager = QueueManager( 1570 address=address, authkey=authkey, serializer=SERIALIZER) 1571 manager.connect() 1572 queue = manager.get_queue() 1573 queue.put('hello world') 1574 1575 def test_rapid_restart(self): 1576 authkey = os.urandom(32) 1577 manager = QueueManager( 1578 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 1579 srvr = manager.get_server() 1580 addr = srvr.address 1581 # Close the connection.Listener socket which gets opened as a part 1582 # of manager.get_server(). It's not needed for the test. 1583 srvr.listener.close() 1584 manager.start() 1585 1586 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1587 p.start() 1588 p.join() 1589 queue = manager.get_queue() 1590 self.assertEqual(queue.get(), 'hello world') 1591 del queue 1592 manager.shutdown() 1593 1594 manager = QueueManager( 1595 address=addr, authkey=authkey, serializer=SERIALIZER) 1596 manager.start() 1597 manager.shutdown() 1598 1599# 1600# 1601# 1602 1603SENTINEL = latin('') 1604 1605class _TestConnection(BaseTestCase): 1606 1607 ALLOWED_TYPES = ('processes', 'threads') 1608 1609 @classmethod 1610 def _echo(cls, conn): 1611 for msg in iter(conn.recv_bytes, SENTINEL): 1612 conn.send_bytes(msg) 1613 conn.close() 1614 1615 def test_connection(self): 1616 conn, child_conn = self.Pipe() 1617 1618 p = self.Process(target=self._echo, args=(child_conn,)) 1619 p.daemon = True 1620 p.start() 1621 1622 seq = [1, 2.25, None] 1623 msg = latin('hello world') 1624 longmsg = msg * 10 1625 arr = array.array('i', range(4)) 1626 1627 if self.TYPE == 'processes': 1628 self.assertEqual(type(conn.fileno()), int) 1629 1630 self.assertEqual(conn.send(seq), None) 1631 self.assertEqual(conn.recv(), seq) 1632 1633 self.assertEqual(conn.send_bytes(msg), None) 1634 self.assertEqual(conn.recv_bytes(), msg) 1635 1636 if self.TYPE == 'processes': 1637 buffer = array.array('i', [0]*10) 1638 expected = list(arr) + [0] * (10 - len(arr)) 1639 self.assertEqual(conn.send_bytes(arr), None) 1640 self.assertEqual(conn.recv_bytes_into(buffer), 1641 len(arr) * buffer.itemsize) 1642 self.assertEqual(list(buffer), expected) 1643 1644 buffer = array.array('i', [0]*10) 1645 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 1646 self.assertEqual(conn.send_bytes(arr), None) 1647 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 1648 len(arr) * buffer.itemsize) 1649 self.assertEqual(list(buffer), expected) 1650 1651 buffer = bytearray(latin(' ' * 40)) 1652 self.assertEqual(conn.send_bytes(longmsg), None) 1653 try: 1654 res = conn.recv_bytes_into(buffer) 1655 except multiprocessing.BufferTooShort, e: 1656 self.assertEqual(e.args, (longmsg,)) 1657 else: 1658 self.fail('expected BufferTooShort, got %s' % res) 1659 1660 poll = TimingWrapper(conn.poll) 1661 1662 self.assertEqual(poll(), False) 1663 self.assertTimingAlmostEqual(poll.elapsed, 0) 1664 1665 self.assertEqual(poll(TIMEOUT1), False) 1666 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 1667 1668 conn.send(None) 1669 time.sleep(.1) 1670 1671 self.assertEqual(poll(TIMEOUT1), True) 1672 self.assertTimingAlmostEqual(poll.elapsed, 0) 1673 1674 self.assertEqual(conn.recv(), None) 1675 1676 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 1677 conn.send_bytes(really_big_msg) 1678 self.assertEqual(conn.recv_bytes(), really_big_msg) 1679 1680 conn.send_bytes(SENTINEL) # tell child to quit 1681 child_conn.close() 1682 1683 if self.TYPE == 'processes': 1684 self.assertEqual(conn.readable, True) 1685 self.assertEqual(conn.writable, True) 1686 self.assertRaises(EOFError, conn.recv) 1687 self.assertRaises(EOFError, conn.recv_bytes) 1688 1689 p.join() 1690 1691 def test_duplex_false(self): 1692 reader, writer = self.Pipe(duplex=False) 1693 self.assertEqual(writer.send(1), None) 1694 self.assertEqual(reader.recv(), 1) 1695 if self.TYPE == 'processes': 1696 self.assertEqual(reader.readable, True) 1697 self.assertEqual(reader.writable, False) 1698 self.assertEqual(writer.readable, False) 1699 self.assertEqual(writer.writable, True) 1700 self.assertRaises(IOError, reader.send, 2) 1701 self.assertRaises(IOError, writer.recv) 1702 self.assertRaises(IOError, writer.poll) 1703 1704 def test_spawn_close(self): 1705 # We test that a pipe connection can be closed by parent 1706 # process immediately after child is spawned. On Windows this 1707 # would have sometimes failed on old versions because 1708 # child_conn would be closed before the child got a chance to 1709 # duplicate it. 1710 conn, child_conn = self.Pipe() 1711 1712 p = self.Process(target=self._echo, args=(child_conn,)) 1713 p.daemon = True 1714 p.start() 1715 child_conn.close() # this might complete before child initializes 1716 1717 msg = latin('hello') 1718 conn.send_bytes(msg) 1719 self.assertEqual(conn.recv_bytes(), msg) 1720 1721 conn.send_bytes(SENTINEL) 1722 conn.close() 1723 p.join() 1724 1725 def test_sendbytes(self): 1726 if self.TYPE != 'processes': 1727 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1728 1729 msg = latin('abcdefghijklmnopqrstuvwxyz') 1730 a, b = self.Pipe() 1731 1732 a.send_bytes(msg) 1733 self.assertEqual(b.recv_bytes(), msg) 1734 1735 a.send_bytes(msg, 5) 1736 self.assertEqual(b.recv_bytes(), msg[5:]) 1737 1738 a.send_bytes(msg, 7, 8) 1739 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 1740 1741 a.send_bytes(msg, 26) 1742 self.assertEqual(b.recv_bytes(), latin('')) 1743 1744 a.send_bytes(msg, 26, 0) 1745 self.assertEqual(b.recv_bytes(), latin('')) 1746 1747 self.assertRaises(ValueError, a.send_bytes, msg, 27) 1748 1749 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 1750 1751 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 1752 1753 self.assertRaises(ValueError, a.send_bytes, msg, -1) 1754 1755 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 1756 1757 @classmethod 1758 def _is_fd_assigned(cls, fd): 1759 try: 1760 os.fstat(fd) 1761 except OSError as e: 1762 if e.errno == errno.EBADF: 1763 return False 1764 raise 1765 else: 1766 return True 1767 1768 @classmethod 1769 def _writefd(cls, conn, data, create_dummy_fds=False): 1770 if create_dummy_fds: 1771 for i in range(0, 256): 1772 if not cls._is_fd_assigned(i): 1773 os.dup2(conn.fileno(), i) 1774 fd = reduction.recv_handle(conn) 1775 if msvcrt: 1776 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 1777 os.write(fd, data) 1778 os.close(fd) 1779 1780 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1781 def test_fd_transfer(self): 1782 if self.TYPE != 'processes': 1783 self.skipTest("only makes sense with processes") 1784 conn, child_conn = self.Pipe(duplex=True) 1785 1786 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 1787 p.daemon = True 1788 p.start() 1789 with open(support.TESTFN, "wb") as f: 1790 fd = f.fileno() 1791 if msvcrt: 1792 fd = msvcrt.get_osfhandle(fd) 1793 reduction.send_handle(conn, fd, p.pid) 1794 p.join() 1795 with open(support.TESTFN, "rb") as f: 1796 self.assertEqual(f.read(), b"foo") 1797 1798 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1799 @unittest.skipIf(sys.platform == "win32", 1800 "test semantics don't make sense on Windows") 1801 @unittest.skipIf(MAXFD <= 256, 1802 "largest assignable fd number is too small") 1803 @unittest.skipUnless(hasattr(os, "dup2"), 1804 "test needs os.dup2()") 1805 def test_large_fd_transfer(self): 1806 # With fd > 256 (issue #11657) 1807 if self.TYPE != 'processes': 1808 self.skipTest("only makes sense with processes") 1809 conn, child_conn = self.Pipe(duplex=True) 1810 1811 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 1812 p.daemon = True 1813 p.start() 1814 with open(support.TESTFN, "wb") as f: 1815 fd = f.fileno() 1816 for newfd in range(256, MAXFD): 1817 if not self._is_fd_assigned(newfd): 1818 break 1819 else: 1820 self.fail("could not find an unassigned large file descriptor") 1821 os.dup2(fd, newfd) 1822 try: 1823 reduction.send_handle(conn, newfd, p.pid) 1824 finally: 1825 os.close(newfd) 1826 p.join() 1827 with open(support.TESTFN, "rb") as f: 1828 self.assertEqual(f.read(), b"bar") 1829 1830 @classmethod 1831 def _send_data_without_fd(self, conn): 1832 os.write(conn.fileno(), b"\0") 1833 1834 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1835 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 1836 def test_missing_fd_transfer(self): 1837 # Check that exception is raised when received data is not 1838 # accompanied by a file descriptor in ancillary data. 1839 if self.TYPE != 'processes': 1840 self.skipTest("only makes sense with processes") 1841 conn, child_conn = self.Pipe(duplex=True) 1842 1843 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 1844 p.daemon = True 1845 p.start() 1846 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 1847 p.join() 1848 1849class _TestListenerClient(BaseTestCase): 1850 1851 ALLOWED_TYPES = ('processes', 'threads') 1852 1853 @classmethod 1854 def _test(cls, address): 1855 conn = cls.connection.Client(address) 1856 conn.send('hello') 1857 conn.close() 1858 1859 def test_listener_client(self): 1860 for family in self.connection.families: 1861 l = self.connection.Listener(family=family) 1862 p = self.Process(target=self._test, args=(l.address,)) 1863 p.daemon = True 1864 p.start() 1865 conn = l.accept() 1866 self.assertEqual(conn.recv(), 'hello') 1867 p.join() 1868 l.close() 1869 1870 def test_issue14725(self): 1871 l = self.connection.Listener() 1872 p = self.Process(target=self._test, args=(l.address,)) 1873 p.daemon = True 1874 p.start() 1875 time.sleep(1) 1876 # On Windows the client process should by now have connected, 1877 # written data and closed the pipe handle by now. This causes 1878 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 1879 # 14725. 1880 conn = l.accept() 1881 self.assertEqual(conn.recv(), 'hello') 1882 conn.close() 1883 p.join() 1884 l.close() 1885 1886# 1887# Test of sending connection and socket objects between processes 1888# 1889""" 1890class _TestPicklingConnections(BaseTestCase): 1891 1892 ALLOWED_TYPES = ('processes',) 1893 1894 def _listener(self, conn, families): 1895 for fam in families: 1896 l = self.connection.Listener(family=fam) 1897 conn.send(l.address) 1898 new_conn = l.accept() 1899 conn.send(new_conn) 1900 1901 if self.TYPE == 'processes': 1902 l = socket.socket() 1903 l.bind(('localhost', 0)) 1904 conn.send(l.getsockname()) 1905 l.listen(1) 1906 new_conn, addr = l.accept() 1907 conn.send(new_conn) 1908 1909 conn.recv() 1910 1911 def _remote(self, conn): 1912 for (address, msg) in iter(conn.recv, None): 1913 client = self.connection.Client(address) 1914 client.send(msg.upper()) 1915 client.close() 1916 1917 if self.TYPE == 'processes': 1918 address, msg = conn.recv() 1919 client = socket.socket() 1920 client.connect(address) 1921 client.sendall(msg.upper()) 1922 client.close() 1923 1924 conn.close() 1925 1926 def test_pickling(self): 1927 try: 1928 multiprocessing.allow_connection_pickling() 1929 except ImportError: 1930 return 1931 1932 families = self.connection.families 1933 1934 lconn, lconn0 = self.Pipe() 1935 lp = self.Process(target=self._listener, args=(lconn0, families)) 1936 lp.daemon = True 1937 lp.start() 1938 lconn0.close() 1939 1940 rconn, rconn0 = self.Pipe() 1941 rp = self.Process(target=self._remote, args=(rconn0,)) 1942 rp.daemon = True 1943 rp.start() 1944 rconn0.close() 1945 1946 for fam in families: 1947 msg = ('This connection uses family %s' % fam).encode('ascii') 1948 address = lconn.recv() 1949 rconn.send((address, msg)) 1950 new_conn = lconn.recv() 1951 self.assertEqual(new_conn.recv(), msg.upper()) 1952 1953 rconn.send(None) 1954 1955 if self.TYPE == 'processes': 1956 msg = latin('This connection uses a normal socket') 1957 address = lconn.recv() 1958 rconn.send((address, msg)) 1959 if hasattr(socket, 'fromfd'): 1960 new_conn = lconn.recv() 1961 self.assertEqual(new_conn.recv(100), msg.upper()) 1962 else: 1963 # XXX On Windows with Py2.6 need to backport fromfd() 1964 discard = lconn.recv_bytes() 1965 1966 lconn.send(None) 1967 1968 rconn.close() 1969 lconn.close() 1970 1971 lp.join() 1972 rp.join() 1973""" 1974# 1975# 1976# 1977 1978class _TestHeap(BaseTestCase): 1979 1980 ALLOWED_TYPES = ('processes',) 1981 1982 def test_heap(self): 1983 iterations = 5000 1984 maxblocks = 50 1985 blocks = [] 1986 1987 # create and destroy lots of blocks of different sizes 1988 for i in xrange(iterations): 1989 size = int(random.lognormvariate(0, 1) * 1000) 1990 b = multiprocessing.heap.BufferWrapper(size) 1991 blocks.append(b) 1992 if len(blocks) > maxblocks: 1993 i = random.randrange(maxblocks) 1994 del blocks[i] 1995 1996 # get the heap object 1997 heap = multiprocessing.heap.BufferWrapper._heap 1998 1999 # verify the state of the heap 2000 all = [] 2001 occupied = 0 2002 heap._lock.acquire() 2003 self.addCleanup(heap._lock.release) 2004 for L in heap._len_to_seq.values(): 2005 for arena, start, stop in L: 2006 all.append((heap._arenas.index(arena), start, stop, 2007 stop-start, 'free')) 2008 for arena, start, stop in heap._allocated_blocks: 2009 all.append((heap._arenas.index(arena), start, stop, 2010 stop-start, 'occupied')) 2011 occupied += (stop-start) 2012 2013 all.sort() 2014 2015 for i in range(len(all)-1): 2016 (arena, start, stop) = all[i][:3] 2017 (narena, nstart, nstop) = all[i+1][:3] 2018 self.assertTrue((arena != narena and nstart == 0) or 2019 (stop == nstart)) 2020 2021 def test_free_from_gc(self): 2022 # Check that freeing of blocks by the garbage collector doesn't deadlock 2023 # (issue #12352). 2024 # Make sure the GC is enabled, and set lower collection thresholds to 2025 # make collections more frequent (and increase the probability of 2026 # deadlock). 2027 if not gc.isenabled(): 2028 gc.enable() 2029 self.addCleanup(gc.disable) 2030 thresholds = gc.get_threshold() 2031 self.addCleanup(gc.set_threshold, *thresholds) 2032 gc.set_threshold(10) 2033 2034 # perform numerous block allocations, with cyclic references to make 2035 # sure objects are collected asynchronously by the gc 2036 for i in range(5000): 2037 a = multiprocessing.heap.BufferWrapper(1) 2038 b = multiprocessing.heap.BufferWrapper(1) 2039 # circular references 2040 a.buddy = b 2041 b.buddy = a 2042 2043# 2044# 2045# 2046 2047class _Foo(Structure): 2048 _fields_ = [ 2049 ('x', c_int), 2050 ('y', c_double) 2051 ] 2052 2053class _TestSharedCTypes(BaseTestCase): 2054 2055 ALLOWED_TYPES = ('processes',) 2056 2057 def setUp(self): 2058 if not HAS_SHAREDCTYPES: 2059 self.skipTest("requires multiprocessing.sharedctypes") 2060 2061 @classmethod 2062 def _double(cls, x, y, foo, arr, string): 2063 x.value *= 2 2064 y.value *= 2 2065 foo.x *= 2 2066 foo.y *= 2 2067 string.value *= 2 2068 for i in range(len(arr)): 2069 arr[i] *= 2 2070 2071 def test_sharedctypes(self, lock=False): 2072 x = Value('i', 7, lock=lock) 2073 y = Value(c_double, 1.0/3.0, lock=lock) 2074 foo = Value(_Foo, 3, 2, lock=lock) 2075 arr = self.Array('d', range(10), lock=lock) 2076 string = self.Array('c', 20, lock=lock) 2077 string.value = latin('hello') 2078 2079 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 2080 p.daemon = True 2081 p.start() 2082 p.join() 2083 2084 self.assertEqual(x.value, 14) 2085 self.assertAlmostEqual(y.value, 2.0/3.0) 2086 self.assertEqual(foo.x, 6) 2087 self.assertAlmostEqual(foo.y, 4.0) 2088 for i in range(10): 2089 self.assertAlmostEqual(arr[i], i*2) 2090 self.assertEqual(string.value, latin('hellohello')) 2091 2092 def test_synchronize(self): 2093 self.test_sharedctypes(lock=True) 2094 2095 def test_copy(self): 2096 foo = _Foo(2, 5.0) 2097 bar = copy(foo) 2098 foo.x = 0 2099 foo.y = 0 2100 self.assertEqual(bar.x, 2) 2101 self.assertAlmostEqual(bar.y, 5.0) 2102 2103# 2104# 2105# 2106 2107class _TestFinalize(BaseTestCase): 2108 2109 ALLOWED_TYPES = ('processes',) 2110 2111 def setUp(self): 2112 self.registry_backup = util._finalizer_registry.copy() 2113 util._finalizer_registry.clear() 2114 2115 def tearDown(self): 2116 self.assertFalse(util._finalizer_registry) 2117 util._finalizer_registry.update(self.registry_backup) 2118 2119 @classmethod 2120 def _test_finalize(cls, conn): 2121 class Foo(object): 2122 pass 2123 2124 a = Foo() 2125 util.Finalize(a, conn.send, args=('a',)) 2126 del a # triggers callback for a 2127 2128 b = Foo() 2129 close_b = util.Finalize(b, conn.send, args=('b',)) 2130 close_b() # triggers callback for b 2131 close_b() # does nothing because callback has already been called 2132 del b # does nothing because callback has already been called 2133 2134 c = Foo() 2135 util.Finalize(c, conn.send, args=('c',)) 2136 2137 d10 = Foo() 2138 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 2139 2140 d01 = Foo() 2141 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 2142 d02 = Foo() 2143 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 2144 d03 = Foo() 2145 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 2146 2147 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 2148 2149 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 2150 2151 # call multiprocessing's cleanup function then exit process without 2152 # garbage collecting locals 2153 util._exit_function() 2154 conn.close() 2155 os._exit(0) 2156 2157 def test_finalize(self): 2158 conn, child_conn = self.Pipe() 2159 2160 p = self.Process(target=self._test_finalize, args=(child_conn,)) 2161 p.daemon = True 2162 p.start() 2163 p.join() 2164 2165 result = [obj for obj in iter(conn.recv, 'STOP')] 2166 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 2167 2168 def test_thread_safety(self): 2169 # bpo-24484: _run_finalizers() should be thread-safe 2170 def cb(): 2171 pass 2172 2173 class Foo(object): 2174 def __init__(self): 2175 self.ref = self # create reference cycle 2176 # insert finalizer at random key 2177 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 2178 2179 finish = False 2180 exc = [] 2181 2182 def run_finalizers(): 2183 while not finish: 2184 time.sleep(random.random() * 1e-1) 2185 try: 2186 # A GC run will eventually happen during this, 2187 # collecting stale Foo's and mutating the registry 2188 util._run_finalizers() 2189 except Exception as e: 2190 exc.append(e) 2191 2192 def make_finalizers(): 2193 d = {} 2194 while not finish: 2195 try: 2196 # Old Foo's get gradually replaced and later 2197 # collected by the GC (because of the cyclic ref) 2198 d[random.getrandbits(5)] = {Foo() for i in range(10)} 2199 except Exception as e: 2200 exc.append(e) 2201 d.clear() 2202 2203 old_interval = sys.getcheckinterval() 2204 old_threshold = gc.get_threshold() 2205 try: 2206 sys.setcheckinterval(10) 2207 gc.set_threshold(5, 5, 5) 2208 threads = [threading.Thread(target=run_finalizers), 2209 threading.Thread(target=make_finalizers)] 2210 with support.start_threads(threads): 2211 time.sleep(4.0) # Wait a bit to trigger race condition 2212 finish = True 2213 if exc: 2214 raise exc[0] 2215 finally: 2216 sys.setcheckinterval(old_interval) 2217 gc.set_threshold(*old_threshold) 2218 gc.collect() # Collect remaining Foo's 2219 2220 2221# 2222# Test that from ... import * works for each module 2223# 2224 2225class _TestImportStar(BaseTestCase): 2226 2227 ALLOWED_TYPES = ('processes',) 2228 2229 def test_import(self): 2230 modules = [ 2231 'multiprocessing', 'multiprocessing.connection', 2232 'multiprocessing.heap', 'multiprocessing.managers', 2233 'multiprocessing.pool', 'multiprocessing.process', 2234 'multiprocessing.synchronize', 'multiprocessing.util' 2235 ] 2236 2237 if HAS_REDUCTION: 2238 modules.append('multiprocessing.reduction') 2239 2240 if c_int is not None: 2241 # This module requires _ctypes 2242 modules.append('multiprocessing.sharedctypes') 2243 2244 for name in modules: 2245 __import__(name) 2246 mod = sys.modules[name] 2247 2248 for attr in getattr(mod, '__all__', ()): 2249 self.assertTrue( 2250 hasattr(mod, attr), 2251 '%r does not have attribute %r' % (mod, attr) 2252 ) 2253 2254# 2255# Quick test that logging works -- does not test logging output 2256# 2257 2258class _TestLogging(BaseTestCase): 2259 2260 ALLOWED_TYPES = ('processes',) 2261 2262 def test_enable_logging(self): 2263 logger = multiprocessing.get_logger() 2264 logger.setLevel(util.SUBWARNING) 2265 self.assertTrue(logger is not None) 2266 logger.debug('this will not be printed') 2267 logger.info('nor will this') 2268 logger.setLevel(LOG_LEVEL) 2269 2270 @classmethod 2271 def _test_level(cls, conn): 2272 logger = multiprocessing.get_logger() 2273 conn.send(logger.getEffectiveLevel()) 2274 2275 def test_level(self): 2276 LEVEL1 = 32 2277 LEVEL2 = 37 2278 2279 logger = multiprocessing.get_logger() 2280 root_logger = logging.getLogger() 2281 root_level = root_logger.level 2282 2283 reader, writer = multiprocessing.Pipe(duplex=False) 2284 2285 logger.setLevel(LEVEL1) 2286 p = self.Process(target=self._test_level, args=(writer,)) 2287 p.daemon = True 2288 p.start() 2289 self.assertEqual(LEVEL1, reader.recv()) 2290 2291 logger.setLevel(logging.NOTSET) 2292 root_logger.setLevel(LEVEL2) 2293 p = self.Process(target=self._test_level, args=(writer,)) 2294 p.daemon = True 2295 p.start() 2296 self.assertEqual(LEVEL2, reader.recv()) 2297 2298 root_logger.setLevel(root_level) 2299 logger.setLevel(level=LOG_LEVEL) 2300 2301 2302# class _TestLoggingProcessName(BaseTestCase): 2303# 2304# def handle(self, record): 2305# assert record.processName == multiprocessing.current_process().name 2306# self.__handled = True 2307# 2308# def test_logging(self): 2309# handler = logging.Handler() 2310# handler.handle = self.handle 2311# self.__handled = False 2312# # Bypass getLogger() and side-effects 2313# logger = logging.getLoggerClass()( 2314# 'multiprocessing.test.TestLoggingProcessName') 2315# logger.addHandler(handler) 2316# logger.propagate = False 2317# 2318# logger.warn('foo') 2319# assert self.__handled 2320 2321# 2322# Check that Process.join() retries if os.waitpid() fails with EINTR 2323# 2324 2325class _TestPollEintr(BaseTestCase): 2326 2327 ALLOWED_TYPES = ('processes',) 2328 2329 @classmethod 2330 def _killer(cls, pid): 2331 time.sleep(0.5) 2332 os.kill(pid, signal.SIGUSR1) 2333 2334 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2335 def test_poll_eintr(self): 2336 got_signal = [False] 2337 def record(*args): 2338 got_signal[0] = True 2339 pid = os.getpid() 2340 oldhandler = signal.signal(signal.SIGUSR1, record) 2341 try: 2342 killer = self.Process(target=self._killer, args=(pid,)) 2343 killer.start() 2344 p = self.Process(target=time.sleep, args=(1,)) 2345 p.start() 2346 p.join() 2347 self.assertTrue(got_signal[0]) 2348 self.assertEqual(p.exitcode, 0) 2349 killer.join() 2350 finally: 2351 signal.signal(signal.SIGUSR1, oldhandler) 2352 2353# 2354# Test to verify handle verification, see issue 3321 2355# 2356 2357class TestInvalidHandle(unittest.TestCase): 2358 2359 @unittest.skipIf(WIN32, "skipped on Windows") 2360 def test_invalid_handles(self): 2361 conn = _multiprocessing.Connection(44977608) 2362 self.assertRaises(IOError, conn.poll) 2363 self.assertRaises(IOError, _multiprocessing.Connection, -1) 2364 2365# 2366# Functions used to create test cases from the base ones in this module 2367# 2368 2369def get_attributes(Source, names): 2370 d = {} 2371 for name in names: 2372 obj = getattr(Source, name) 2373 if type(obj) == type(get_attributes): 2374 obj = staticmethod(obj) 2375 d[name] = obj 2376 return d 2377 2378def create_test_cases(Mixin, type): 2379 result = {} 2380 glob = globals() 2381 Type = type.capitalize() 2382 2383 for name in glob.keys(): 2384 if name.startswith('_Test'): 2385 base = glob[name] 2386 if type in base.ALLOWED_TYPES: 2387 newname = 'With' + Type + name[1:] 2388 class Temp(base, unittest.TestCase, Mixin): 2389 pass 2390 result[newname] = Temp 2391 Temp.__name__ = newname 2392 Temp.__module__ = Mixin.__module__ 2393 return result 2394 2395# 2396# Create test cases 2397# 2398 2399class ProcessesMixin(object): 2400 TYPE = 'processes' 2401 Process = multiprocessing.Process 2402 locals().update(get_attributes(multiprocessing, ( 2403 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2404 'Condition', 'Event', 'Value', 'Array', 'RawValue', 2405 'RawArray', 'current_process', 'active_children', 'Pipe', 2406 'connection', 'JoinableQueue', 'Pool' 2407 ))) 2408 2409testcases_processes = create_test_cases(ProcessesMixin, type='processes') 2410globals().update(testcases_processes) 2411 2412 2413class ManagerMixin(object): 2414 TYPE = 'manager' 2415 Process = multiprocessing.Process 2416 manager = object.__new__(multiprocessing.managers.SyncManager) 2417 locals().update(get_attributes(manager, ( 2418 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2419 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', 2420 'Namespace', 'JoinableQueue', 'Pool' 2421 ))) 2422 2423testcases_manager = create_test_cases(ManagerMixin, type='manager') 2424globals().update(testcases_manager) 2425 2426 2427class ThreadsMixin(object): 2428 TYPE = 'threads' 2429 Process = multiprocessing.dummy.Process 2430 locals().update(get_attributes(multiprocessing.dummy, ( 2431 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2432 'Condition', 'Event', 'Value', 'Array', 'current_process', 2433 'active_children', 'Pipe', 'connection', 'dict', 'list', 2434 'Namespace', 'JoinableQueue', 'Pool' 2435 ))) 2436 2437testcases_threads = create_test_cases(ThreadsMixin, type='threads') 2438globals().update(testcases_threads) 2439 2440class OtherTest(unittest.TestCase): 2441 # TODO: add more tests for deliver/answer challenge. 2442 def test_deliver_challenge_auth_failure(self): 2443 class _FakeConnection(object): 2444 def recv_bytes(self, size): 2445 return b'something bogus' 2446 def send_bytes(self, data): 2447 pass 2448 self.assertRaises(multiprocessing.AuthenticationError, 2449 multiprocessing.connection.deliver_challenge, 2450 _FakeConnection(), b'abc') 2451 2452 def test_answer_challenge_auth_failure(self): 2453 class _FakeConnection(object): 2454 def __init__(self): 2455 self.count = 0 2456 def recv_bytes(self, size): 2457 self.count += 1 2458 if self.count == 1: 2459 return multiprocessing.connection.CHALLENGE 2460 elif self.count == 2: 2461 return b'something bogus' 2462 return b'' 2463 def send_bytes(self, data): 2464 pass 2465 self.assertRaises(multiprocessing.AuthenticationError, 2466 multiprocessing.connection.answer_challenge, 2467 _FakeConnection(), b'abc') 2468 2469# 2470# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 2471# 2472 2473def initializer(ns): 2474 ns.test += 1 2475 2476class TestInitializers(unittest.TestCase): 2477 def setUp(self): 2478 self.mgr = multiprocessing.Manager() 2479 self.ns = self.mgr.Namespace() 2480 self.ns.test = 0 2481 2482 def tearDown(self): 2483 self.mgr.shutdown() 2484 2485 def test_manager_initializer(self): 2486 m = multiprocessing.managers.SyncManager() 2487 self.assertRaises(TypeError, m.start, 1) 2488 m.start(initializer, (self.ns,)) 2489 self.assertEqual(self.ns.test, 1) 2490 m.shutdown() 2491 2492 def test_pool_initializer(self): 2493 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 2494 p = multiprocessing.Pool(1, initializer, (self.ns,)) 2495 p.close() 2496 p.join() 2497 self.assertEqual(self.ns.test, 1) 2498 2499# 2500# Issue 5155, 5313, 5331: Test process in processes 2501# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 2502# 2503 2504def _this_sub_process(q): 2505 try: 2506 item = q.get(block=False) 2507 except Queue.Empty: 2508 pass 2509 2510def _test_process(q): 2511 queue = multiprocessing.Queue() 2512 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 2513 subProc.daemon = True 2514 subProc.start() 2515 subProc.join() 2516 2517def _afunc(x): 2518 return x*x 2519 2520def pool_in_process(): 2521 pool = multiprocessing.Pool(processes=4) 2522 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 2523 2524class _file_like(object): 2525 def __init__(self, delegate): 2526 self._delegate = delegate 2527 self._pid = None 2528 2529 @property 2530 def cache(self): 2531 pid = os.getpid() 2532 # There are no race conditions since fork keeps only the running thread 2533 if pid != self._pid: 2534 self._pid = pid 2535 self._cache = [] 2536 return self._cache 2537 2538 def write(self, data): 2539 self.cache.append(data) 2540 2541 def flush(self): 2542 self._delegate.write(''.join(self.cache)) 2543 self._cache = [] 2544 2545class TestStdinBadfiledescriptor(unittest.TestCase): 2546 2547 def test_queue_in_process(self): 2548 queue = multiprocessing.Queue() 2549 proc = multiprocessing.Process(target=_test_process, args=(queue,)) 2550 proc.start() 2551 proc.join() 2552 2553 def test_pool_in_process(self): 2554 p = multiprocessing.Process(target=pool_in_process) 2555 p.start() 2556 p.join() 2557 2558 def test_flushing(self): 2559 sio = StringIO() 2560 flike = _file_like(sio) 2561 flike.write('foo') 2562 proc = multiprocessing.Process(target=lambda: flike.flush()) 2563 flike.flush() 2564 assert sio.getvalue() == 'foo' 2565 2566# 2567# Test interaction with socket timeouts - see Issue #6056 2568# 2569 2570class TestTimeouts(unittest.TestCase): 2571 @classmethod 2572 def _test_timeout(cls, child, address): 2573 time.sleep(1) 2574 child.send(123) 2575 child.close() 2576 conn = multiprocessing.connection.Client(address) 2577 conn.send(456) 2578 conn.close() 2579 2580 def test_timeout(self): 2581 old_timeout = socket.getdefaulttimeout() 2582 try: 2583 socket.setdefaulttimeout(0.1) 2584 parent, child = multiprocessing.Pipe(duplex=True) 2585 l = multiprocessing.connection.Listener(family='AF_INET') 2586 p = multiprocessing.Process(target=self._test_timeout, 2587 args=(child, l.address)) 2588 p.start() 2589 child.close() 2590 self.assertEqual(parent.recv(), 123) 2591 parent.close() 2592 conn = l.accept() 2593 self.assertEqual(conn.recv(), 456) 2594 conn.close() 2595 l.close() 2596 p.join(10) 2597 finally: 2598 socket.setdefaulttimeout(old_timeout) 2599 2600# 2601# Test what happens with no "if __name__ == '__main__'" 2602# 2603 2604class TestNoForkBomb(unittest.TestCase): 2605 def test_noforkbomb(self): 2606 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 2607 if WIN32: 2608 rc, out, err = test.script_helper.assert_python_failure(name) 2609 self.assertEqual(out, '') 2610 self.assertIn('RuntimeError', err) 2611 else: 2612 rc, out, err = test.script_helper.assert_python_ok(name) 2613 self.assertEqual(out.rstrip(), '123') 2614 self.assertEqual(err, '') 2615 2616# 2617# Issue 12098: check sys.flags of child matches that for parent 2618# 2619 2620class TestFlags(unittest.TestCase): 2621 @classmethod 2622 def run_in_grandchild(cls, conn): 2623 conn.send(tuple(sys.flags)) 2624 2625 @classmethod 2626 def run_in_child(cls): 2627 import json 2628 r, w = multiprocessing.Pipe(duplex=False) 2629 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 2630 p.start() 2631 grandchild_flags = r.recv() 2632 p.join() 2633 r.close() 2634 w.close() 2635 flags = (tuple(sys.flags), grandchild_flags) 2636 print(json.dumps(flags)) 2637 2638 @support.requires_unicode # XXX json needs unicode support 2639 def test_flags(self): 2640 import json, subprocess 2641 # start child process using unusual flags 2642 prog = ('from test.test_multiprocessing import TestFlags; ' + 2643 'TestFlags.run_in_child()') 2644 data = subprocess.check_output( 2645 [sys.executable, '-E', '-B', '-O', '-c', prog]) 2646 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 2647 self.assertEqual(child_flags, grandchild_flags) 2648 2649# 2650# Issue #17555: ForkAwareThreadLock 2651# 2652 2653class TestForkAwareThreadLock(unittest.TestCase): 2654 # We recurisvely start processes. Issue #17555 meant that the 2655 # after fork registry would get duplicate entries for the same 2656 # lock. The size of the registry at generation n was ~2**n. 2657 2658 @classmethod 2659 def child(cls, n, conn): 2660 if n > 1: 2661 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 2662 p.start() 2663 p.join() 2664 else: 2665 conn.send(len(util._afterfork_registry)) 2666 conn.close() 2667 2668 def test_lock(self): 2669 r, w = multiprocessing.Pipe(False) 2670 l = util.ForkAwareThreadLock() 2671 old_size = len(util._afterfork_registry) 2672 p = multiprocessing.Process(target=self.child, args=(5, w)) 2673 p.start() 2674 new_size = r.recv() 2675 p.join() 2676 self.assertLessEqual(new_size, old_size) 2677 2678# 2679# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 2680# 2681 2682class TestIgnoreEINTR(unittest.TestCase): 2683 2684 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 2685 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 2686 2687 @classmethod 2688 def _test_ignore(cls, conn): 2689 def handler(signum, frame): 2690 pass 2691 signal.signal(signal.SIGUSR1, handler) 2692 conn.send('ready') 2693 x = conn.recv() 2694 conn.send(x) 2695 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 2696 2697 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2698 def test_ignore(self): 2699 conn, child_conn = multiprocessing.Pipe() 2700 try: 2701 p = multiprocessing.Process(target=self._test_ignore, 2702 args=(child_conn,)) 2703 p.daemon = True 2704 p.start() 2705 child_conn.close() 2706 self.assertEqual(conn.recv(), 'ready') 2707 time.sleep(0.1) 2708 os.kill(p.pid, signal.SIGUSR1) 2709 time.sleep(0.1) 2710 conn.send(1234) 2711 self.assertEqual(conn.recv(), 1234) 2712 time.sleep(0.1) 2713 os.kill(p.pid, signal.SIGUSR1) 2714 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 2715 time.sleep(0.1) 2716 p.join() 2717 finally: 2718 conn.close() 2719 2720 @classmethod 2721 def _test_ignore_listener(cls, conn): 2722 def handler(signum, frame): 2723 pass 2724 signal.signal(signal.SIGUSR1, handler) 2725 l = multiprocessing.connection.Listener() 2726 conn.send(l.address) 2727 a = l.accept() 2728 a.send('welcome') 2729 2730 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2731 def test_ignore_listener(self): 2732 conn, child_conn = multiprocessing.Pipe() 2733 try: 2734 p = multiprocessing.Process(target=self._test_ignore_listener, 2735 args=(child_conn,)) 2736 p.daemon = True 2737 p.start() 2738 child_conn.close() 2739 address = conn.recv() 2740 time.sleep(0.1) 2741 os.kill(p.pid, signal.SIGUSR1) 2742 time.sleep(0.1) 2743 client = multiprocessing.connection.Client(address) 2744 self.assertEqual(client.recv(), 'welcome') 2745 p.join() 2746 finally: 2747 conn.close() 2748 2749# 2750# 2751# 2752 2753testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, 2754 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb, 2755 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR] 2756 2757# 2758# 2759# 2760 2761def test_main(run=None): 2762 if sys.platform.startswith("linux"): 2763 try: 2764 lock = multiprocessing.RLock() 2765 except OSError: 2766 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") 2767 2768 check_enough_semaphores() 2769 2770 if run is None: 2771 from test.support import run_unittest as run 2772 2773 util.get_temp_dir() # creates temp directory for use by all processes 2774 2775 multiprocessing.get_logger().setLevel(LOG_LEVEL) 2776 2777 ProcessesMixin.pool = multiprocessing.Pool(4) 2778 ThreadsMixin.pool = multiprocessing.dummy.Pool(4) 2779 ManagerMixin.manager.__init__() 2780 ManagerMixin.manager.start() 2781 ManagerMixin.pool = ManagerMixin.manager.Pool(4) 2782 2783 testcases = ( 2784 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + 2785 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + 2786 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + 2787 testcases_other 2788 ) 2789 2790 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase 2791 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) 2792 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading 2793 # module during these tests is at least platform dependent and possibly 2794 # non-deterministic on any given platform. So we don't mind if the listed 2795 # warnings aren't actually raised. 2796 with support.check_py3k_warnings( 2797 (".+__(get|set)slice__ has been removed", DeprecationWarning), 2798 (r"sys.exc_clear\(\) not supported", DeprecationWarning), 2799 quiet=True): 2800 run(suite) 2801 2802 ThreadsMixin.pool.terminate() 2803 ProcessesMixin.pool.terminate() 2804 ManagerMixin.pool.terminate() 2805 ManagerMixin.manager.shutdown() 2806 2807 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool 2808 2809def main(): 2810 test_main(unittest.TextTestRunner(verbosity=2).run) 2811 2812if __name__ == '__main__': 2813 main() 2814