1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import Queue 7import time 8import sys 9import os 10import gc 11import signal 12import array 13import socket 14import random 15import logging 16import errno 17import test.script_helper 18from test import test_support 19from StringIO import StringIO 20_multiprocessing = test_support.import_module('_multiprocessing') 21# import threading after _multiprocessing to raise a more relevant error 22# message: "No module named _multiprocessing". _multiprocessing is not compiled 23# without thread support. 24import threading 25 26# Work around broken sem_open implementations 27test_support.import_module('multiprocessing.synchronize') 28 29import multiprocessing.dummy 30import multiprocessing.connection 31import multiprocessing.managers 32import multiprocessing.heap 33import multiprocessing.pool 34 35from multiprocessing import util 36 37try: 38 from multiprocessing import reduction 39 HAS_REDUCTION = True 40except ImportError: 41 HAS_REDUCTION = False 42 43try: 44 from multiprocessing.sharedctypes import Value, copy 45 HAS_SHAREDCTYPES = True 46except ImportError: 47 HAS_SHAREDCTYPES = False 48 49try: 50 import msvcrt 51except ImportError: 52 msvcrt = None 53 54# 55# 56# 57 58latin = str 59 60# 61# Constants 62# 63 64LOG_LEVEL = util.SUBWARNING 65#LOG_LEVEL = logging.DEBUG 66 67DELTA = 0.1 68CHECK_TIMINGS = False # making true makes tests take a lot longer 69 # and can sometimes cause some non-serious 70 # failures because some calls block a bit 71 # longer than expected 72if CHECK_TIMINGS: 73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 74else: 75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 76 77HAVE_GETVALUE = not getattr(_multiprocessing, 78 'HAVE_BROKEN_SEM_GETVALUE', False) 79 80WIN32 = (sys.platform == "win32") 81 82try: 83 MAXFD = os.sysconf("SC_OPEN_MAX") 84except: 85 MAXFD = 256 86 87# 88# Some tests require ctypes 89# 90 91try: 92 from ctypes import Structure, c_int, c_double 93except ImportError: 94 Structure = object 95 c_int = c_double = None 96 97 98def check_enough_semaphores(): 99 """Check that the system supports enough semaphores to run the test.""" 100 # minimum number of semaphores available according to POSIX 101 nsems_min = 256 102 try: 103 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 104 except (AttributeError, ValueError): 105 # sysconf not available or setting not available 106 return 107 if nsems == -1 or nsems >= nsems_min: 108 return 109 raise unittest.SkipTest("The OS doesn't support enough semaphores " 110 "to run the test (required: %d)." % nsems_min) 111 112 113# 114# Creates a wrapper for a function which records the time it takes to finish 115# 116 117class TimingWrapper(object): 118 119 def __init__(self, func): 120 self.func = func 121 self.elapsed = None 122 123 def __call__(self, *args, **kwds): 124 t = time.time() 125 try: 126 return self.func(*args, **kwds) 127 finally: 128 self.elapsed = time.time() - t 129 130# 131# Base class for test cases 132# 133 134class BaseTestCase(object): 135 136 ALLOWED_TYPES = ('processes', 'manager', 'threads') 137 138 def assertTimingAlmostEqual(self, a, b): 139 if CHECK_TIMINGS: 140 self.assertAlmostEqual(a, b, 1) 141 142 def assertReturnsIfImplemented(self, value, func, *args): 143 try: 144 res = func(*args) 145 except NotImplementedError: 146 pass 147 else: 148 return self.assertEqual(value, res) 149 150 # For the sanity of Windows users, rather than crashing or freezing in 151 # multiple ways. 152 def __reduce__(self, *args): 153 raise NotImplementedError("shouldn't try to pickle a test case") 154 155 __reduce_ex__ = __reduce__ 156 157# 158# Return the value of a semaphore 159# 160 161def get_value(self): 162 try: 163 return self.get_value() 164 except AttributeError: 165 try: 166 return self._Semaphore__value 167 except AttributeError: 168 try: 169 return self._value 170 except AttributeError: 171 raise NotImplementedError 172 173# 174# Testcases 175# 176 177class _TestProcess(BaseTestCase): 178 179 ALLOWED_TYPES = ('processes', 'threads') 180 181 def test_current(self): 182 if self.TYPE == 'threads': 183 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 184 185 current = self.current_process() 186 authkey = current.authkey 187 188 self.assertTrue(current.is_alive()) 189 self.assertTrue(not current.daemon) 190 self.assertIsInstance(authkey, bytes) 191 self.assertTrue(len(authkey) > 0) 192 self.assertEqual(current.ident, os.getpid()) 193 self.assertEqual(current.exitcode, None) 194 195 @classmethod 196 def _test(cls, q, *args, **kwds): 197 current = cls.current_process() 198 q.put(args) 199 q.put(kwds) 200 q.put(current.name) 201 if cls.TYPE != 'threads': 202 q.put(bytes(current.authkey)) 203 q.put(current.pid) 204 205 def test_process(self): 206 q = self.Queue(1) 207 e = self.Event() 208 args = (q, 1, 2) 209 kwargs = {'hello':23, 'bye':2.54} 210 name = 'SomeProcess' 211 p = self.Process( 212 target=self._test, args=args, kwargs=kwargs, name=name 213 ) 214 p.daemon = True 215 current = self.current_process() 216 217 if self.TYPE != 'threads': 218 self.assertEqual(p.authkey, current.authkey) 219 self.assertEqual(p.is_alive(), False) 220 self.assertEqual(p.daemon, True) 221 self.assertNotIn(p, self.active_children()) 222 self.assertTrue(type(self.active_children()) is list) 223 self.assertEqual(p.exitcode, None) 224 225 p.start() 226 227 self.assertEqual(p.exitcode, None) 228 self.assertEqual(p.is_alive(), True) 229 self.assertIn(p, self.active_children()) 230 231 self.assertEqual(q.get(), args[1:]) 232 self.assertEqual(q.get(), kwargs) 233 self.assertEqual(q.get(), p.name) 234 if self.TYPE != 'threads': 235 self.assertEqual(q.get(), current.authkey) 236 self.assertEqual(q.get(), p.pid) 237 238 p.join() 239 240 self.assertEqual(p.exitcode, 0) 241 self.assertEqual(p.is_alive(), False) 242 self.assertNotIn(p, self.active_children()) 243 244 @classmethod 245 def _test_terminate(cls): 246 time.sleep(1000) 247 248 def test_terminate(self): 249 if self.TYPE == 'threads': 250 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 251 252 p = self.Process(target=self._test_terminate) 253 p.daemon = True 254 p.start() 255 256 self.assertEqual(p.is_alive(), True) 257 self.assertIn(p, self.active_children()) 258 self.assertEqual(p.exitcode, None) 259 260 p.terminate() 261 262 join = TimingWrapper(p.join) 263 self.assertEqual(join(), None) 264 self.assertTimingAlmostEqual(join.elapsed, 0.0) 265 266 self.assertEqual(p.is_alive(), False) 267 self.assertNotIn(p, self.active_children()) 268 269 p.join() 270 271 # XXX sometimes get p.exitcode == 0 on Windows ... 272 #self.assertEqual(p.exitcode, -signal.SIGTERM) 273 274 def test_cpu_count(self): 275 try: 276 cpus = multiprocessing.cpu_count() 277 except NotImplementedError: 278 cpus = 1 279 self.assertTrue(type(cpus) is int) 280 self.assertTrue(cpus >= 1) 281 282 def test_active_children(self): 283 self.assertEqual(type(self.active_children()), list) 284 285 p = self.Process(target=time.sleep, args=(DELTA,)) 286 self.assertNotIn(p, self.active_children()) 287 288 p.daemon = True 289 p.start() 290 self.assertIn(p, self.active_children()) 291 292 p.join() 293 self.assertNotIn(p, self.active_children()) 294 295 @classmethod 296 def _test_recursion(cls, wconn, id): 297 from multiprocessing import forking 298 wconn.send(id) 299 if len(id) < 2: 300 for i in range(2): 301 p = cls.Process( 302 target=cls._test_recursion, args=(wconn, id+[i]) 303 ) 304 p.start() 305 p.join() 306 307 def test_recursion(self): 308 rconn, wconn = self.Pipe(duplex=False) 309 self._test_recursion(wconn, []) 310 311 time.sleep(DELTA) 312 result = [] 313 while rconn.poll(): 314 result.append(rconn.recv()) 315 316 expected = [ 317 [], 318 [0], 319 [0, 0], 320 [0, 1], 321 [1], 322 [1, 0], 323 [1, 1] 324 ] 325 self.assertEqual(result, expected) 326 327 @classmethod 328 def _test_sys_exit(cls, reason, testfn): 329 sys.stderr = open(testfn, 'w') 330 sys.exit(reason) 331 332 def test_sys_exit(self): 333 # See Issue 13854 334 if self.TYPE == 'threads': 335 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 336 337 testfn = test_support.TESTFN 338 self.addCleanup(test_support.unlink, testfn) 339 340 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)): 341 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 342 p.daemon = True 343 p.start() 344 p.join(5) 345 self.assertEqual(p.exitcode, code) 346 347 with open(testfn, 'r') as f: 348 self.assertEqual(f.read().rstrip(), str(reason)) 349 350 for reason in (True, False, 8): 351 p = self.Process(target=sys.exit, args=(reason,)) 352 p.daemon = True 353 p.start() 354 p.join(5) 355 self.assertEqual(p.exitcode, reason) 356 357# 358# 359# 360 361class _UpperCaser(multiprocessing.Process): 362 363 def __init__(self): 364 multiprocessing.Process.__init__(self) 365 self.child_conn, self.parent_conn = multiprocessing.Pipe() 366 367 def run(self): 368 self.parent_conn.close() 369 for s in iter(self.child_conn.recv, None): 370 self.child_conn.send(s.upper()) 371 self.child_conn.close() 372 373 def submit(self, s): 374 assert type(s) is str 375 self.parent_conn.send(s) 376 return self.parent_conn.recv() 377 378 def stop(self): 379 self.parent_conn.send(None) 380 self.parent_conn.close() 381 self.child_conn.close() 382 383class _TestSubclassingProcess(BaseTestCase): 384 385 ALLOWED_TYPES = ('processes',) 386 387 def test_subclassing(self): 388 uppercaser = _UpperCaser() 389 uppercaser.daemon = True 390 uppercaser.start() 391 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 392 self.assertEqual(uppercaser.submit('world'), 'WORLD') 393 uppercaser.stop() 394 uppercaser.join() 395 396# 397# 398# 399 400def queue_empty(q): 401 if hasattr(q, 'empty'): 402 return q.empty() 403 else: 404 return q.qsize() == 0 405 406def queue_full(q, maxsize): 407 if hasattr(q, 'full'): 408 return q.full() 409 else: 410 return q.qsize() == maxsize 411 412 413class _TestQueue(BaseTestCase): 414 415 416 @classmethod 417 def _test_put(cls, queue, child_can_start, parent_can_continue): 418 child_can_start.wait() 419 for i in range(6): 420 queue.get() 421 parent_can_continue.set() 422 423 def test_put(self): 424 MAXSIZE = 6 425 queue = self.Queue(maxsize=MAXSIZE) 426 child_can_start = self.Event() 427 parent_can_continue = self.Event() 428 429 proc = self.Process( 430 target=self._test_put, 431 args=(queue, child_can_start, parent_can_continue) 432 ) 433 proc.daemon = True 434 proc.start() 435 436 self.assertEqual(queue_empty(queue), True) 437 self.assertEqual(queue_full(queue, MAXSIZE), False) 438 439 queue.put(1) 440 queue.put(2, True) 441 queue.put(3, True, None) 442 queue.put(4, False) 443 queue.put(5, False, None) 444 queue.put_nowait(6) 445 446 # the values may be in buffer but not yet in pipe so sleep a bit 447 time.sleep(DELTA) 448 449 self.assertEqual(queue_empty(queue), False) 450 self.assertEqual(queue_full(queue, MAXSIZE), True) 451 452 put = TimingWrapper(queue.put) 453 put_nowait = TimingWrapper(queue.put_nowait) 454 455 self.assertRaises(Queue.Full, put, 7, False) 456 self.assertTimingAlmostEqual(put.elapsed, 0) 457 458 self.assertRaises(Queue.Full, put, 7, False, None) 459 self.assertTimingAlmostEqual(put.elapsed, 0) 460 461 self.assertRaises(Queue.Full, put_nowait, 7) 462 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 463 464 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1) 465 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 466 467 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2) 468 self.assertTimingAlmostEqual(put.elapsed, 0) 469 470 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3) 471 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 472 473 child_can_start.set() 474 parent_can_continue.wait() 475 476 self.assertEqual(queue_empty(queue), True) 477 self.assertEqual(queue_full(queue, MAXSIZE), False) 478 479 proc.join() 480 481 @classmethod 482 def _test_get(cls, queue, child_can_start, parent_can_continue): 483 child_can_start.wait() 484 #queue.put(1) 485 queue.put(2) 486 queue.put(3) 487 queue.put(4) 488 queue.put(5) 489 parent_can_continue.set() 490 491 def test_get(self): 492 queue = self.Queue() 493 child_can_start = self.Event() 494 parent_can_continue = self.Event() 495 496 proc = self.Process( 497 target=self._test_get, 498 args=(queue, child_can_start, parent_can_continue) 499 ) 500 proc.daemon = True 501 proc.start() 502 503 self.assertEqual(queue_empty(queue), True) 504 505 child_can_start.set() 506 parent_can_continue.wait() 507 508 time.sleep(DELTA) 509 self.assertEqual(queue_empty(queue), False) 510 511 # Hangs unexpectedly, remove for now 512 #self.assertEqual(queue.get(), 1) 513 self.assertEqual(queue.get(True, None), 2) 514 self.assertEqual(queue.get(True), 3) 515 self.assertEqual(queue.get(timeout=1), 4) 516 self.assertEqual(queue.get_nowait(), 5) 517 518 self.assertEqual(queue_empty(queue), True) 519 520 get = TimingWrapper(queue.get) 521 get_nowait = TimingWrapper(queue.get_nowait) 522 523 self.assertRaises(Queue.Empty, get, False) 524 self.assertTimingAlmostEqual(get.elapsed, 0) 525 526 self.assertRaises(Queue.Empty, get, False, None) 527 self.assertTimingAlmostEqual(get.elapsed, 0) 528 529 self.assertRaises(Queue.Empty, get_nowait) 530 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 531 532 self.assertRaises(Queue.Empty, get, True, TIMEOUT1) 533 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 534 535 self.assertRaises(Queue.Empty, get, False, TIMEOUT2) 536 self.assertTimingAlmostEqual(get.elapsed, 0) 537 538 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3) 539 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 540 541 proc.join() 542 543 @classmethod 544 def _test_fork(cls, queue): 545 for i in range(10, 20): 546 queue.put(i) 547 # note that at this point the items may only be buffered, so the 548 # process cannot shutdown until the feeder thread has finished 549 # pushing items onto the pipe. 550 551 def test_fork(self): 552 # Old versions of Queue would fail to create a new feeder 553 # thread for a forked process if the original process had its 554 # own feeder thread. This test checks that this no longer 555 # happens. 556 557 queue = self.Queue() 558 559 # put items on queue so that main process starts a feeder thread 560 for i in range(10): 561 queue.put(i) 562 563 # wait to make sure thread starts before we fork a new process 564 time.sleep(DELTA) 565 566 # fork process 567 p = self.Process(target=self._test_fork, args=(queue,)) 568 p.daemon = True 569 p.start() 570 571 # check that all expected items are in the queue 572 for i in range(20): 573 self.assertEqual(queue.get(), i) 574 self.assertRaises(Queue.Empty, queue.get, False) 575 576 p.join() 577 578 def test_qsize(self): 579 q = self.Queue() 580 try: 581 self.assertEqual(q.qsize(), 0) 582 except NotImplementedError: 583 self.skipTest('qsize method not implemented') 584 q.put(1) 585 self.assertEqual(q.qsize(), 1) 586 q.put(5) 587 self.assertEqual(q.qsize(), 2) 588 q.get() 589 self.assertEqual(q.qsize(), 1) 590 q.get() 591 self.assertEqual(q.qsize(), 0) 592 593 @classmethod 594 def _test_task_done(cls, q): 595 for obj in iter(q.get, None): 596 time.sleep(DELTA) 597 q.task_done() 598 599 def test_task_done(self): 600 queue = self.JoinableQueue() 601 602 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): 603 self.skipTest("requires 'queue.task_done()' method") 604 605 workers = [self.Process(target=self._test_task_done, args=(queue,)) 606 for i in xrange(4)] 607 608 for p in workers: 609 p.daemon = True 610 p.start() 611 612 for i in xrange(10): 613 queue.put(i) 614 615 queue.join() 616 617 for p in workers: 618 queue.put(None) 619 620 for p in workers: 621 p.join() 622 623 def test_no_import_lock_contention(self): 624 with test_support.temp_cwd(): 625 module_name = 'imported_by_an_imported_module' 626 with open(module_name + '.py', 'w') as f: 627 f.write("""if 1: 628 import multiprocessing 629 630 q = multiprocessing.Queue() 631 q.put('knock knock') 632 q.get(timeout=3) 633 q.close() 634 """) 635 636 with test_support.DirsOnSysPath(os.getcwd()): 637 try: 638 __import__(module_name) 639 except Queue.Empty: 640 self.fail("Probable regression on import lock contention;" 641 " see Issue #22853") 642 643# 644# 645# 646 647class _TestLock(BaseTestCase): 648 649 def test_lock(self): 650 lock = self.Lock() 651 self.assertEqual(lock.acquire(), True) 652 self.assertEqual(lock.acquire(False), False) 653 self.assertEqual(lock.release(), None) 654 self.assertRaises((ValueError, threading.ThreadError), lock.release) 655 656 def test_rlock(self): 657 lock = self.RLock() 658 self.assertEqual(lock.acquire(), True) 659 self.assertEqual(lock.acquire(), True) 660 self.assertEqual(lock.acquire(), True) 661 self.assertEqual(lock.release(), None) 662 self.assertEqual(lock.release(), None) 663 self.assertEqual(lock.release(), None) 664 self.assertRaises((AssertionError, RuntimeError), lock.release) 665 666 def test_lock_context(self): 667 with self.Lock(): 668 pass 669 670 671class _TestSemaphore(BaseTestCase): 672 673 def _test_semaphore(self, sem): 674 self.assertReturnsIfImplemented(2, get_value, sem) 675 self.assertEqual(sem.acquire(), True) 676 self.assertReturnsIfImplemented(1, get_value, sem) 677 self.assertEqual(sem.acquire(), True) 678 self.assertReturnsIfImplemented(0, get_value, sem) 679 self.assertEqual(sem.acquire(False), False) 680 self.assertReturnsIfImplemented(0, get_value, sem) 681 self.assertEqual(sem.release(), None) 682 self.assertReturnsIfImplemented(1, get_value, sem) 683 self.assertEqual(sem.release(), None) 684 self.assertReturnsIfImplemented(2, get_value, sem) 685 686 def test_semaphore(self): 687 sem = self.Semaphore(2) 688 self._test_semaphore(sem) 689 self.assertEqual(sem.release(), None) 690 self.assertReturnsIfImplemented(3, get_value, sem) 691 self.assertEqual(sem.release(), None) 692 self.assertReturnsIfImplemented(4, get_value, sem) 693 694 def test_bounded_semaphore(self): 695 sem = self.BoundedSemaphore(2) 696 self._test_semaphore(sem) 697 # Currently fails on OS/X 698 #if HAVE_GETVALUE: 699 # self.assertRaises(ValueError, sem.release) 700 # self.assertReturnsIfImplemented(2, get_value, sem) 701 702 def test_timeout(self): 703 if self.TYPE != 'processes': 704 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 705 706 sem = self.Semaphore(0) 707 acquire = TimingWrapper(sem.acquire) 708 709 self.assertEqual(acquire(False), False) 710 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 711 712 self.assertEqual(acquire(False, None), False) 713 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 714 715 self.assertEqual(acquire(False, TIMEOUT1), False) 716 self.assertTimingAlmostEqual(acquire.elapsed, 0) 717 718 self.assertEqual(acquire(True, TIMEOUT2), False) 719 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 720 721 self.assertEqual(acquire(timeout=TIMEOUT3), False) 722 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 723 724 725class _TestCondition(BaseTestCase): 726 727 @classmethod 728 def f(cls, cond, sleeping, woken, timeout=None): 729 cond.acquire() 730 sleeping.release() 731 cond.wait(timeout) 732 woken.release() 733 cond.release() 734 735 def check_invariant(self, cond): 736 # this is only supposed to succeed when there are no sleepers 737 if self.TYPE == 'processes': 738 try: 739 sleepers = (cond._sleeping_count.get_value() - 740 cond._woken_count.get_value()) 741 self.assertEqual(sleepers, 0) 742 self.assertEqual(cond._wait_semaphore.get_value(), 0) 743 except NotImplementedError: 744 pass 745 746 def test_notify(self): 747 cond = self.Condition() 748 sleeping = self.Semaphore(0) 749 woken = self.Semaphore(0) 750 751 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 752 p.daemon = True 753 p.start() 754 755 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 756 p.daemon = True 757 p.start() 758 759 # wait for both children to start sleeping 760 sleeping.acquire() 761 sleeping.acquire() 762 763 # check no process/thread has woken up 764 time.sleep(DELTA) 765 self.assertReturnsIfImplemented(0, get_value, woken) 766 767 # wake up one process/thread 768 cond.acquire() 769 cond.notify() 770 cond.release() 771 772 # check one process/thread has woken up 773 time.sleep(DELTA) 774 self.assertReturnsIfImplemented(1, get_value, woken) 775 776 # wake up another 777 cond.acquire() 778 cond.notify() 779 cond.release() 780 781 # check other has woken up 782 time.sleep(DELTA) 783 self.assertReturnsIfImplemented(2, get_value, woken) 784 785 # check state is not mucked up 786 self.check_invariant(cond) 787 p.join() 788 789 def test_notify_all(self): 790 cond = self.Condition() 791 sleeping = self.Semaphore(0) 792 woken = self.Semaphore(0) 793 794 # start some threads/processes which will timeout 795 for i in range(3): 796 p = self.Process(target=self.f, 797 args=(cond, sleeping, woken, TIMEOUT1)) 798 p.daemon = True 799 p.start() 800 801 t = threading.Thread(target=self.f, 802 args=(cond, sleeping, woken, TIMEOUT1)) 803 t.daemon = True 804 t.start() 805 806 # wait for them all to sleep 807 for i in xrange(6): 808 sleeping.acquire() 809 810 # check they have all timed out 811 for i in xrange(6): 812 woken.acquire() 813 self.assertReturnsIfImplemented(0, get_value, woken) 814 815 # check state is not mucked up 816 self.check_invariant(cond) 817 818 # start some more threads/processes 819 for i in range(3): 820 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 821 p.daemon = True 822 p.start() 823 824 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 825 t.daemon = True 826 t.start() 827 828 # wait for them to all sleep 829 for i in xrange(6): 830 sleeping.acquire() 831 832 # check no process/thread has woken up 833 time.sleep(DELTA) 834 self.assertReturnsIfImplemented(0, get_value, woken) 835 836 # wake them all up 837 cond.acquire() 838 cond.notify_all() 839 cond.release() 840 841 # check they have all woken 842 time.sleep(DELTA) 843 self.assertReturnsIfImplemented(6, get_value, woken) 844 845 # check state is not mucked up 846 self.check_invariant(cond) 847 848 def test_timeout(self): 849 cond = self.Condition() 850 wait = TimingWrapper(cond.wait) 851 cond.acquire() 852 res = wait(TIMEOUT1) 853 cond.release() 854 self.assertEqual(res, None) 855 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 856 857 858class _TestEvent(BaseTestCase): 859 860 @classmethod 861 def _test_event(cls, event): 862 time.sleep(TIMEOUT2) 863 event.set() 864 865 def test_event(self): 866 event = self.Event() 867 wait = TimingWrapper(event.wait) 868 869 # Removed temporarily, due to API shear, this does not 870 # work with threading._Event objects. is_set == isSet 871 self.assertEqual(event.is_set(), False) 872 873 # Removed, threading.Event.wait() will return the value of the __flag 874 # instead of None. API Shear with the semaphore backed mp.Event 875 self.assertEqual(wait(0.0), False) 876 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 877 self.assertEqual(wait(TIMEOUT1), False) 878 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 879 880 event.set() 881 882 # See note above on the API differences 883 self.assertEqual(event.is_set(), True) 884 self.assertEqual(wait(), True) 885 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 886 self.assertEqual(wait(TIMEOUT1), True) 887 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 888 # self.assertEqual(event.is_set(), True) 889 890 event.clear() 891 892 #self.assertEqual(event.is_set(), False) 893 894 p = self.Process(target=self._test_event, args=(event,)) 895 p.daemon = True 896 p.start() 897 self.assertEqual(wait(), True) 898 899# 900# 901# 902 903class _TestValue(BaseTestCase): 904 905 ALLOWED_TYPES = ('processes',) 906 907 codes_values = [ 908 ('i', 4343, 24234), 909 ('d', 3.625, -4.25), 910 ('h', -232, 234), 911 ('c', latin('x'), latin('y')) 912 ] 913 914 def setUp(self): 915 if not HAS_SHAREDCTYPES: 916 self.skipTest("requires multiprocessing.sharedctypes") 917 918 @classmethod 919 def _test(cls, values): 920 for sv, cv in zip(values, cls.codes_values): 921 sv.value = cv[2] 922 923 924 def test_value(self, raw=False): 925 if raw: 926 values = [self.RawValue(code, value) 927 for code, value, _ in self.codes_values] 928 else: 929 values = [self.Value(code, value) 930 for code, value, _ in self.codes_values] 931 932 for sv, cv in zip(values, self.codes_values): 933 self.assertEqual(sv.value, cv[1]) 934 935 proc = self.Process(target=self._test, args=(values,)) 936 proc.daemon = True 937 proc.start() 938 proc.join() 939 940 for sv, cv in zip(values, self.codes_values): 941 self.assertEqual(sv.value, cv[2]) 942 943 def test_rawvalue(self): 944 self.test_value(raw=True) 945 946 def test_getobj_getlock(self): 947 val1 = self.Value('i', 5) 948 lock1 = val1.get_lock() 949 obj1 = val1.get_obj() 950 951 val2 = self.Value('i', 5, lock=None) 952 lock2 = val2.get_lock() 953 obj2 = val2.get_obj() 954 955 lock = self.Lock() 956 val3 = self.Value('i', 5, lock=lock) 957 lock3 = val3.get_lock() 958 obj3 = val3.get_obj() 959 self.assertEqual(lock, lock3) 960 961 arr4 = self.Value('i', 5, lock=False) 962 self.assertFalse(hasattr(arr4, 'get_lock')) 963 self.assertFalse(hasattr(arr4, 'get_obj')) 964 965 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 966 967 arr5 = self.RawValue('i', 5) 968 self.assertFalse(hasattr(arr5, 'get_lock')) 969 self.assertFalse(hasattr(arr5, 'get_obj')) 970 971 972class _TestArray(BaseTestCase): 973 974 ALLOWED_TYPES = ('processes',) 975 976 @classmethod 977 def f(cls, seq): 978 for i in range(1, len(seq)): 979 seq[i] += seq[i-1] 980 981 @unittest.skipIf(c_int is None, "requires _ctypes") 982 def test_array(self, raw=False): 983 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 984 if raw: 985 arr = self.RawArray('i', seq) 986 else: 987 arr = self.Array('i', seq) 988 989 self.assertEqual(len(arr), len(seq)) 990 self.assertEqual(arr[3], seq[3]) 991 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 992 993 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 994 995 self.assertEqual(list(arr[:]), seq) 996 997 self.f(seq) 998 999 p = self.Process(target=self.f, args=(arr,)) 1000 p.daemon = True 1001 p.start() 1002 p.join() 1003 1004 self.assertEqual(list(arr[:]), seq) 1005 1006 @unittest.skipIf(c_int is None, "requires _ctypes") 1007 def test_array_from_size(self): 1008 size = 10 1009 # Test for zeroing (see issue #11675). 1010 # The repetition below strengthens the test by increasing the chances 1011 # of previously allocated non-zero memory being used for the new array 1012 # on the 2nd and 3rd loops. 1013 for _ in range(3): 1014 arr = self.Array('i', size) 1015 self.assertEqual(len(arr), size) 1016 self.assertEqual(list(arr), [0] * size) 1017 arr[:] = range(10) 1018 self.assertEqual(list(arr), range(10)) 1019 del arr 1020 1021 @unittest.skipIf(c_int is None, "requires _ctypes") 1022 def test_rawarray(self): 1023 self.test_array(raw=True) 1024 1025 @unittest.skipIf(c_int is None, "requires _ctypes") 1026 def test_array_accepts_long(self): 1027 arr = self.Array('i', 10L) 1028 self.assertEqual(len(arr), 10) 1029 raw_arr = self.RawArray('i', 10L) 1030 self.assertEqual(len(raw_arr), 10) 1031 1032 @unittest.skipIf(c_int is None, "requires _ctypes") 1033 def test_getobj_getlock_obj(self): 1034 arr1 = self.Array('i', range(10)) 1035 lock1 = arr1.get_lock() 1036 obj1 = arr1.get_obj() 1037 1038 arr2 = self.Array('i', range(10), lock=None) 1039 lock2 = arr2.get_lock() 1040 obj2 = arr2.get_obj() 1041 1042 lock = self.Lock() 1043 arr3 = self.Array('i', range(10), lock=lock) 1044 lock3 = arr3.get_lock() 1045 obj3 = arr3.get_obj() 1046 self.assertEqual(lock, lock3) 1047 1048 arr4 = self.Array('i', range(10), lock=False) 1049 self.assertFalse(hasattr(arr4, 'get_lock')) 1050 self.assertFalse(hasattr(arr4, 'get_obj')) 1051 self.assertRaises(AttributeError, 1052 self.Array, 'i', range(10), lock='notalock') 1053 1054 arr5 = self.RawArray('i', range(10)) 1055 self.assertFalse(hasattr(arr5, 'get_lock')) 1056 self.assertFalse(hasattr(arr5, 'get_obj')) 1057 1058# 1059# 1060# 1061 1062class _TestContainers(BaseTestCase): 1063 1064 ALLOWED_TYPES = ('manager',) 1065 1066 def test_list(self): 1067 a = self.list(range(10)) 1068 self.assertEqual(a[:], range(10)) 1069 1070 b = self.list() 1071 self.assertEqual(b[:], []) 1072 1073 b.extend(range(5)) 1074 self.assertEqual(b[:], range(5)) 1075 1076 self.assertEqual(b[2], 2) 1077 self.assertEqual(b[2:10], [2,3,4]) 1078 1079 b *= 2 1080 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 1081 1082 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 1083 1084 self.assertEqual(a[:], range(10)) 1085 1086 d = [a, b] 1087 e = self.list(d) 1088 self.assertEqual( 1089 e[:], 1090 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 1091 ) 1092 1093 f = self.list([a]) 1094 a.append('hello') 1095 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) 1096 1097 def test_dict(self): 1098 d = self.dict() 1099 indices = range(65, 70) 1100 for i in indices: 1101 d[i] = chr(i) 1102 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 1103 self.assertEqual(sorted(d.keys()), indices) 1104 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 1105 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 1106 1107 def test_namespace(self): 1108 n = self.Namespace() 1109 n.name = 'Bob' 1110 n.job = 'Builder' 1111 n._hidden = 'hidden' 1112 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 1113 del n.job 1114 self.assertEqual(str(n), "Namespace(name='Bob')") 1115 self.assertTrue(hasattr(n, 'name')) 1116 self.assertTrue(not hasattr(n, 'job')) 1117 1118# 1119# 1120# 1121 1122def sqr(x, wait=0.0): 1123 time.sleep(wait) 1124 return x*x 1125 1126class SayWhenError(ValueError): pass 1127 1128def exception_throwing_generator(total, when): 1129 for i in range(total): 1130 if i == when: 1131 raise SayWhenError("Somebody said when") 1132 yield i 1133 1134class _TestPool(BaseTestCase): 1135 1136 def test_apply(self): 1137 papply = self.pool.apply 1138 self.assertEqual(papply(sqr, (5,)), sqr(5)) 1139 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 1140 1141 def test_map(self): 1142 pmap = self.pool.map 1143 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) 1144 self.assertEqual(pmap(sqr, range(100), chunksize=20), 1145 map(sqr, range(100))) 1146 1147 def test_map_unplicklable(self): 1148 # Issue #19425 -- failure to pickle should not cause a hang 1149 if self.TYPE == 'threads': 1150 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1151 class A(object): 1152 def __reduce__(self): 1153 raise RuntimeError('cannot pickle') 1154 with self.assertRaises(RuntimeError): 1155 self.pool.map(sqr, [A()]*10) 1156 1157 def test_map_chunksize(self): 1158 try: 1159 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1160 except multiprocessing.TimeoutError: 1161 self.fail("pool.map_async with chunksize stalled on null list") 1162 1163 def test_async(self): 1164 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 1165 get = TimingWrapper(res.get) 1166 self.assertEqual(get(), 49) 1167 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1168 1169 def test_async_timeout(self): 1170 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 1171 get = TimingWrapper(res.get) 1172 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 1173 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 1174 1175 def test_imap(self): 1176 it = self.pool.imap(sqr, range(10)) 1177 self.assertEqual(list(it), map(sqr, range(10))) 1178 1179 it = self.pool.imap(sqr, range(10)) 1180 for i in range(10): 1181 self.assertEqual(it.next(), i*i) 1182 self.assertRaises(StopIteration, it.next) 1183 1184 it = self.pool.imap(sqr, range(1000), chunksize=100) 1185 for i in range(1000): 1186 self.assertEqual(it.next(), i*i) 1187 self.assertRaises(StopIteration, it.next) 1188 1189 def test_imap_handle_iterable_exception(self): 1190 if self.TYPE == 'manager': 1191 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1192 1193 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 1194 for i in range(3): 1195 self.assertEqual(next(it), i*i) 1196 self.assertRaises(SayWhenError, it.next) 1197 1198 # SayWhenError seen at start of problematic chunk's results 1199 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 1200 for i in range(6): 1201 self.assertEqual(next(it), i*i) 1202 self.assertRaises(SayWhenError, it.next) 1203 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 1204 for i in range(4): 1205 self.assertEqual(next(it), i*i) 1206 self.assertRaises(SayWhenError, it.next) 1207 1208 def test_imap_unordered(self): 1209 it = self.pool.imap_unordered(sqr, range(1000)) 1210 self.assertEqual(sorted(it), map(sqr, range(1000))) 1211 1212 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) 1213 self.assertEqual(sorted(it), map(sqr, range(1000))) 1214 1215 def test_imap_unordered_handle_iterable_exception(self): 1216 if self.TYPE == 'manager': 1217 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1218 1219 it = self.pool.imap_unordered(sqr, 1220 exception_throwing_generator(10, 3), 1221 1) 1222 expected_values = map(sqr, range(10)) 1223 with self.assertRaises(SayWhenError): 1224 # imap_unordered makes it difficult to anticipate the SayWhenError 1225 for i in range(10): 1226 value = next(it) 1227 self.assertIn(value, expected_values) 1228 expected_values.remove(value) 1229 1230 it = self.pool.imap_unordered(sqr, 1231 exception_throwing_generator(20, 7), 1232 2) 1233 expected_values = map(sqr, range(20)) 1234 with self.assertRaises(SayWhenError): 1235 for i in range(20): 1236 value = next(it) 1237 self.assertIn(value, expected_values) 1238 expected_values.remove(value) 1239 1240 def test_make_pool(self): 1241 self.assertRaises(ValueError, multiprocessing.Pool, -1) 1242 self.assertRaises(ValueError, multiprocessing.Pool, 0) 1243 1244 p = multiprocessing.Pool(3) 1245 self.assertEqual(3, len(p._pool)) 1246 p.close() 1247 p.join() 1248 1249 def test_terminate(self): 1250 p = self.Pool(4) 1251 result = p.map_async( 1252 time.sleep, [0.1 for i in range(10000)], chunksize=1 1253 ) 1254 p.terminate() 1255 join = TimingWrapper(p.join) 1256 join() 1257 self.assertTrue(join.elapsed < 0.2) 1258 1259 def test_empty_iterable(self): 1260 # See Issue 12157 1261 p = self.Pool(1) 1262 1263 self.assertEqual(p.map(sqr, []), []) 1264 self.assertEqual(list(p.imap(sqr, [])), []) 1265 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 1266 self.assertEqual(p.map_async(sqr, []).get(), []) 1267 1268 p.close() 1269 p.join() 1270 1271def unpickleable_result(): 1272 return lambda: 42 1273 1274class _TestPoolWorkerErrors(BaseTestCase): 1275 ALLOWED_TYPES = ('processes', ) 1276 1277 def test_unpickleable_result(self): 1278 from multiprocessing.pool import MaybeEncodingError 1279 p = multiprocessing.Pool(2) 1280 1281 # Make sure we don't lose pool processes because of encoding errors. 1282 for iteration in range(20): 1283 res = p.apply_async(unpickleable_result) 1284 self.assertRaises(MaybeEncodingError, res.get) 1285 1286 p.close() 1287 p.join() 1288 1289class _TestPoolWorkerLifetime(BaseTestCase): 1290 1291 ALLOWED_TYPES = ('processes', ) 1292 def test_pool_worker_lifetime(self): 1293 p = multiprocessing.Pool(3, maxtasksperchild=10) 1294 self.assertEqual(3, len(p._pool)) 1295 origworkerpids = [w.pid for w in p._pool] 1296 # Run many tasks so each worker gets replaced (hopefully) 1297 results = [] 1298 for i in range(100): 1299 results.append(p.apply_async(sqr, (i, ))) 1300 # Fetch the results and verify we got the right answers, 1301 # also ensuring all the tasks have completed. 1302 for (j, res) in enumerate(results): 1303 self.assertEqual(res.get(), sqr(j)) 1304 # Refill the pool 1305 p._repopulate_pool() 1306 # Wait until all workers are alive 1307 # (countdown * DELTA = 5 seconds max startup process time) 1308 countdown = 50 1309 while countdown and not all(w.is_alive() for w in p._pool): 1310 countdown -= 1 1311 time.sleep(DELTA) 1312 finalworkerpids = [w.pid for w in p._pool] 1313 # All pids should be assigned. See issue #7805. 1314 self.assertNotIn(None, origworkerpids) 1315 self.assertNotIn(None, finalworkerpids) 1316 # Finally, check that the worker pids have changed 1317 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 1318 p.close() 1319 p.join() 1320 1321 def test_pool_worker_lifetime_early_close(self): 1322 # Issue #10332: closing a pool whose workers have limited lifetimes 1323 # before all the tasks completed would make join() hang. 1324 p = multiprocessing.Pool(3, maxtasksperchild=1) 1325 results = [] 1326 for i in range(6): 1327 results.append(p.apply_async(sqr, (i, 0.3))) 1328 p.close() 1329 p.join() 1330 # check the results 1331 for (j, res) in enumerate(results): 1332 self.assertEqual(res.get(), sqr(j)) 1333 1334 1335# 1336# Test that manager has expected number of shared objects left 1337# 1338 1339class _TestZZZNumberOfObjects(BaseTestCase): 1340 # Because test cases are sorted alphabetically, this one will get 1341 # run after all the other tests for the manager. It tests that 1342 # there have been no "reference leaks" for the manager's shared 1343 # objects. Note the comment in _TestPool.test_terminate(). 1344 ALLOWED_TYPES = ('manager',) 1345 1346 def test_number_of_objects(self): 1347 EXPECTED_NUMBER = 1 # the pool object is still alive 1348 multiprocessing.active_children() # discard dead process objs 1349 gc.collect() # do garbage collection 1350 refs = self.manager._number_of_objects() 1351 debug_info = self.manager._debug_info() 1352 if refs != EXPECTED_NUMBER: 1353 print self.manager._debug_info() 1354 print debug_info 1355 1356 self.assertEqual(refs, EXPECTED_NUMBER) 1357 1358# 1359# Test of creating a customized manager class 1360# 1361 1362from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 1363 1364class FooBar(object): 1365 def f(self): 1366 return 'f()' 1367 def g(self): 1368 raise ValueError 1369 def _h(self): 1370 return '_h()' 1371 1372def baz(): 1373 for i in xrange(10): 1374 yield i*i 1375 1376class IteratorProxy(BaseProxy): 1377 _exposed_ = ('next', '__next__') 1378 def __iter__(self): 1379 return self 1380 def next(self): 1381 return self._callmethod('next') 1382 def __next__(self): 1383 return self._callmethod('__next__') 1384 1385class MyManager(BaseManager): 1386 pass 1387 1388MyManager.register('Foo', callable=FooBar) 1389MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 1390MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 1391 1392 1393class _TestMyManager(BaseTestCase): 1394 1395 ALLOWED_TYPES = ('manager',) 1396 1397 def test_mymanager(self): 1398 manager = MyManager() 1399 manager.start() 1400 1401 foo = manager.Foo() 1402 bar = manager.Bar() 1403 baz = manager.baz() 1404 1405 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 1406 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 1407 1408 self.assertEqual(foo_methods, ['f', 'g']) 1409 self.assertEqual(bar_methods, ['f', '_h']) 1410 1411 self.assertEqual(foo.f(), 'f()') 1412 self.assertRaises(ValueError, foo.g) 1413 self.assertEqual(foo._callmethod('f'), 'f()') 1414 self.assertRaises(RemoteError, foo._callmethod, '_h') 1415 1416 self.assertEqual(bar.f(), 'f()') 1417 self.assertEqual(bar._h(), '_h()') 1418 self.assertEqual(bar._callmethod('f'), 'f()') 1419 self.assertEqual(bar._callmethod('_h'), '_h()') 1420 1421 self.assertEqual(list(baz), [i*i for i in range(10)]) 1422 1423 manager.shutdown() 1424 1425# 1426# Test of connecting to a remote server and using xmlrpclib for serialization 1427# 1428 1429_queue = Queue.Queue() 1430def get_queue(): 1431 return _queue 1432 1433class QueueManager(BaseManager): 1434 '''manager class used by server process''' 1435QueueManager.register('get_queue', callable=get_queue) 1436 1437class QueueManager2(BaseManager): 1438 '''manager class which specifies the same interface as QueueManager''' 1439QueueManager2.register('get_queue') 1440 1441 1442SERIALIZER = 'xmlrpclib' 1443 1444class _TestRemoteManager(BaseTestCase): 1445 1446 ALLOWED_TYPES = ('manager',) 1447 values = ['hello world', None, True, 2.25, 1448 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8 1449 ] 1450 result = values[:] 1451 if test_support.have_unicode: 1452 #result[-1] = u'hall\xe5 v\xe4rlden' 1453 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 ' 1454 r'\u0441\u0432\u0456\u0442') 1455 values.append(uvalue) 1456 result.append(uvalue) 1457 1458 @classmethod 1459 def _putter(cls, address, authkey): 1460 manager = QueueManager2( 1461 address=address, authkey=authkey, serializer=SERIALIZER 1462 ) 1463 manager.connect() 1464 queue = manager.get_queue() 1465 # Note that xmlrpclib will deserialize object as a list not a tuple 1466 queue.put(tuple(cls.values)) 1467 1468 def test_remote(self): 1469 authkey = os.urandom(32) 1470 1471 manager = QueueManager( 1472 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER 1473 ) 1474 manager.start() 1475 1476 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1477 p.daemon = True 1478 p.start() 1479 1480 manager2 = QueueManager2( 1481 address=manager.address, authkey=authkey, serializer=SERIALIZER 1482 ) 1483 manager2.connect() 1484 queue = manager2.get_queue() 1485 1486 self.assertEqual(queue.get(), self.result) 1487 1488 # Because we are using xmlrpclib for serialization instead of 1489 # pickle this will cause a serialization error. 1490 self.assertRaises(Exception, queue.put, time.sleep) 1491 1492 # Make queue finalizer run before the server is stopped 1493 del queue 1494 manager.shutdown() 1495 1496class _TestManagerRestart(BaseTestCase): 1497 1498 @classmethod 1499 def _putter(cls, address, authkey): 1500 manager = QueueManager( 1501 address=address, authkey=authkey, serializer=SERIALIZER) 1502 manager.connect() 1503 queue = manager.get_queue() 1504 queue.put('hello world') 1505 1506 def test_rapid_restart(self): 1507 authkey = os.urandom(32) 1508 manager = QueueManager( 1509 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 1510 srvr = manager.get_server() 1511 addr = srvr.address 1512 # Close the connection.Listener socket which gets opened as a part 1513 # of manager.get_server(). It's not needed for the test. 1514 srvr.listener.close() 1515 manager.start() 1516 1517 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1518 p.daemon = True 1519 p.start() 1520 queue = manager.get_queue() 1521 self.assertEqual(queue.get(), 'hello world') 1522 del queue 1523 manager.shutdown() 1524 manager = QueueManager( 1525 address=addr, authkey=authkey, serializer=SERIALIZER) 1526 manager.start() 1527 manager.shutdown() 1528 1529# 1530# 1531# 1532 1533SENTINEL = latin('') 1534 1535class _TestConnection(BaseTestCase): 1536 1537 ALLOWED_TYPES = ('processes', 'threads') 1538 1539 @classmethod 1540 def _echo(cls, conn): 1541 for msg in iter(conn.recv_bytes, SENTINEL): 1542 conn.send_bytes(msg) 1543 conn.close() 1544 1545 def test_connection(self): 1546 conn, child_conn = self.Pipe() 1547 1548 p = self.Process(target=self._echo, args=(child_conn,)) 1549 p.daemon = True 1550 p.start() 1551 1552 seq = [1, 2.25, None] 1553 msg = latin('hello world') 1554 longmsg = msg * 10 1555 arr = array.array('i', range(4)) 1556 1557 if self.TYPE == 'processes': 1558 self.assertEqual(type(conn.fileno()), int) 1559 1560 self.assertEqual(conn.send(seq), None) 1561 self.assertEqual(conn.recv(), seq) 1562 1563 self.assertEqual(conn.send_bytes(msg), None) 1564 self.assertEqual(conn.recv_bytes(), msg) 1565 1566 if self.TYPE == 'processes': 1567 buffer = array.array('i', [0]*10) 1568 expected = list(arr) + [0] * (10 - len(arr)) 1569 self.assertEqual(conn.send_bytes(arr), None) 1570 self.assertEqual(conn.recv_bytes_into(buffer), 1571 len(arr) * buffer.itemsize) 1572 self.assertEqual(list(buffer), expected) 1573 1574 buffer = array.array('i', [0]*10) 1575 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 1576 self.assertEqual(conn.send_bytes(arr), None) 1577 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 1578 len(arr) * buffer.itemsize) 1579 self.assertEqual(list(buffer), expected) 1580 1581 buffer = bytearray(latin(' ' * 40)) 1582 self.assertEqual(conn.send_bytes(longmsg), None) 1583 try: 1584 res = conn.recv_bytes_into(buffer) 1585 except multiprocessing.BufferTooShort, e: 1586 self.assertEqual(e.args, (longmsg,)) 1587 else: 1588 self.fail('expected BufferTooShort, got %s' % res) 1589 1590 poll = TimingWrapper(conn.poll) 1591 1592 self.assertEqual(poll(), False) 1593 self.assertTimingAlmostEqual(poll.elapsed, 0) 1594 1595 self.assertEqual(poll(TIMEOUT1), False) 1596 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 1597 1598 conn.send(None) 1599 time.sleep(.1) 1600 1601 self.assertEqual(poll(TIMEOUT1), True) 1602 self.assertTimingAlmostEqual(poll.elapsed, 0) 1603 1604 self.assertEqual(conn.recv(), None) 1605 1606 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 1607 conn.send_bytes(really_big_msg) 1608 self.assertEqual(conn.recv_bytes(), really_big_msg) 1609 1610 conn.send_bytes(SENTINEL) # tell child to quit 1611 child_conn.close() 1612 1613 if self.TYPE == 'processes': 1614 self.assertEqual(conn.readable, True) 1615 self.assertEqual(conn.writable, True) 1616 self.assertRaises(EOFError, conn.recv) 1617 self.assertRaises(EOFError, conn.recv_bytes) 1618 1619 p.join() 1620 1621 def test_duplex_false(self): 1622 reader, writer = self.Pipe(duplex=False) 1623 self.assertEqual(writer.send(1), None) 1624 self.assertEqual(reader.recv(), 1) 1625 if self.TYPE == 'processes': 1626 self.assertEqual(reader.readable, True) 1627 self.assertEqual(reader.writable, False) 1628 self.assertEqual(writer.readable, False) 1629 self.assertEqual(writer.writable, True) 1630 self.assertRaises(IOError, reader.send, 2) 1631 self.assertRaises(IOError, writer.recv) 1632 self.assertRaises(IOError, writer.poll) 1633 1634 def test_spawn_close(self): 1635 # We test that a pipe connection can be closed by parent 1636 # process immediately after child is spawned. On Windows this 1637 # would have sometimes failed on old versions because 1638 # child_conn would be closed before the child got a chance to 1639 # duplicate it. 1640 conn, child_conn = self.Pipe() 1641 1642 p = self.Process(target=self._echo, args=(child_conn,)) 1643 p.daemon = True 1644 p.start() 1645 child_conn.close() # this might complete before child initializes 1646 1647 msg = latin('hello') 1648 conn.send_bytes(msg) 1649 self.assertEqual(conn.recv_bytes(), msg) 1650 1651 conn.send_bytes(SENTINEL) 1652 conn.close() 1653 p.join() 1654 1655 def test_sendbytes(self): 1656 if self.TYPE != 'processes': 1657 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1658 1659 msg = latin('abcdefghijklmnopqrstuvwxyz') 1660 a, b = self.Pipe() 1661 1662 a.send_bytes(msg) 1663 self.assertEqual(b.recv_bytes(), msg) 1664 1665 a.send_bytes(msg, 5) 1666 self.assertEqual(b.recv_bytes(), msg[5:]) 1667 1668 a.send_bytes(msg, 7, 8) 1669 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 1670 1671 a.send_bytes(msg, 26) 1672 self.assertEqual(b.recv_bytes(), latin('')) 1673 1674 a.send_bytes(msg, 26, 0) 1675 self.assertEqual(b.recv_bytes(), latin('')) 1676 1677 self.assertRaises(ValueError, a.send_bytes, msg, 27) 1678 1679 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 1680 1681 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 1682 1683 self.assertRaises(ValueError, a.send_bytes, msg, -1) 1684 1685 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 1686 1687 @classmethod 1688 def _is_fd_assigned(cls, fd): 1689 try: 1690 os.fstat(fd) 1691 except OSError as e: 1692 if e.errno == errno.EBADF: 1693 return False 1694 raise 1695 else: 1696 return True 1697 1698 @classmethod 1699 def _writefd(cls, conn, data, create_dummy_fds=False): 1700 if create_dummy_fds: 1701 for i in range(0, 256): 1702 if not cls._is_fd_assigned(i): 1703 os.dup2(conn.fileno(), i) 1704 fd = reduction.recv_handle(conn) 1705 if msvcrt: 1706 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 1707 os.write(fd, data) 1708 os.close(fd) 1709 1710 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1711 def test_fd_transfer(self): 1712 if self.TYPE != 'processes': 1713 self.skipTest("only makes sense with processes") 1714 conn, child_conn = self.Pipe(duplex=True) 1715 1716 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 1717 p.daemon = True 1718 p.start() 1719 with open(test_support.TESTFN, "wb") as f: 1720 fd = f.fileno() 1721 if msvcrt: 1722 fd = msvcrt.get_osfhandle(fd) 1723 reduction.send_handle(conn, fd, p.pid) 1724 p.join() 1725 with open(test_support.TESTFN, "rb") as f: 1726 self.assertEqual(f.read(), b"foo") 1727 1728 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1729 @unittest.skipIf(sys.platform == "win32", 1730 "test semantics don't make sense on Windows") 1731 @unittest.skipIf(MAXFD <= 256, 1732 "largest assignable fd number is too small") 1733 @unittest.skipUnless(hasattr(os, "dup2"), 1734 "test needs os.dup2()") 1735 def test_large_fd_transfer(self): 1736 # With fd > 256 (issue #11657) 1737 if self.TYPE != 'processes': 1738 self.skipTest("only makes sense with processes") 1739 conn, child_conn = self.Pipe(duplex=True) 1740 1741 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 1742 p.daemon = True 1743 p.start() 1744 with open(test_support.TESTFN, "wb") as f: 1745 fd = f.fileno() 1746 for newfd in range(256, MAXFD): 1747 if not self._is_fd_assigned(newfd): 1748 break 1749 else: 1750 self.fail("could not find an unassigned large file descriptor") 1751 os.dup2(fd, newfd) 1752 try: 1753 reduction.send_handle(conn, newfd, p.pid) 1754 finally: 1755 os.close(newfd) 1756 p.join() 1757 with open(test_support.TESTFN, "rb") as f: 1758 self.assertEqual(f.read(), b"bar") 1759 1760 @classmethod 1761 def _send_data_without_fd(self, conn): 1762 os.write(conn.fileno(), b"\0") 1763 1764 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1765 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 1766 def test_missing_fd_transfer(self): 1767 # Check that exception is raised when received data is not 1768 # accompanied by a file descriptor in ancillary data. 1769 if self.TYPE != 'processes': 1770 self.skipTest("only makes sense with processes") 1771 conn, child_conn = self.Pipe(duplex=True) 1772 1773 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 1774 p.daemon = True 1775 p.start() 1776 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 1777 p.join() 1778 1779class _TestListenerClient(BaseTestCase): 1780 1781 ALLOWED_TYPES = ('processes', 'threads') 1782 1783 @classmethod 1784 def _test(cls, address): 1785 conn = cls.connection.Client(address) 1786 conn.send('hello') 1787 conn.close() 1788 1789 def test_listener_client(self): 1790 for family in self.connection.families: 1791 l = self.connection.Listener(family=family) 1792 p = self.Process(target=self._test, args=(l.address,)) 1793 p.daemon = True 1794 p.start() 1795 conn = l.accept() 1796 self.assertEqual(conn.recv(), 'hello') 1797 p.join() 1798 l.close() 1799 1800 def test_issue14725(self): 1801 l = self.connection.Listener() 1802 p = self.Process(target=self._test, args=(l.address,)) 1803 p.daemon = True 1804 p.start() 1805 time.sleep(1) 1806 # On Windows the client process should by now have connected, 1807 # written data and closed the pipe handle by now. This causes 1808 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 1809 # 14725. 1810 conn = l.accept() 1811 self.assertEqual(conn.recv(), 'hello') 1812 conn.close() 1813 p.join() 1814 l.close() 1815 1816# 1817# Test of sending connection and socket objects between processes 1818# 1819""" 1820class _TestPicklingConnections(BaseTestCase): 1821 1822 ALLOWED_TYPES = ('processes',) 1823 1824 def _listener(self, conn, families): 1825 for fam in families: 1826 l = self.connection.Listener(family=fam) 1827 conn.send(l.address) 1828 new_conn = l.accept() 1829 conn.send(new_conn) 1830 1831 if self.TYPE == 'processes': 1832 l = socket.socket() 1833 l.bind(('localhost', 0)) 1834 conn.send(l.getsockname()) 1835 l.listen(1) 1836 new_conn, addr = l.accept() 1837 conn.send(new_conn) 1838 1839 conn.recv() 1840 1841 def _remote(self, conn): 1842 for (address, msg) in iter(conn.recv, None): 1843 client = self.connection.Client(address) 1844 client.send(msg.upper()) 1845 client.close() 1846 1847 if self.TYPE == 'processes': 1848 address, msg = conn.recv() 1849 client = socket.socket() 1850 client.connect(address) 1851 client.sendall(msg.upper()) 1852 client.close() 1853 1854 conn.close() 1855 1856 def test_pickling(self): 1857 try: 1858 multiprocessing.allow_connection_pickling() 1859 except ImportError: 1860 return 1861 1862 families = self.connection.families 1863 1864 lconn, lconn0 = self.Pipe() 1865 lp = self.Process(target=self._listener, args=(lconn0, families)) 1866 lp.daemon = True 1867 lp.start() 1868 lconn0.close() 1869 1870 rconn, rconn0 = self.Pipe() 1871 rp = self.Process(target=self._remote, args=(rconn0,)) 1872 rp.daemon = True 1873 rp.start() 1874 rconn0.close() 1875 1876 for fam in families: 1877 msg = ('This connection uses family %s' % fam).encode('ascii') 1878 address = lconn.recv() 1879 rconn.send((address, msg)) 1880 new_conn = lconn.recv() 1881 self.assertEqual(new_conn.recv(), msg.upper()) 1882 1883 rconn.send(None) 1884 1885 if self.TYPE == 'processes': 1886 msg = latin('This connection uses a normal socket') 1887 address = lconn.recv() 1888 rconn.send((address, msg)) 1889 if hasattr(socket, 'fromfd'): 1890 new_conn = lconn.recv() 1891 self.assertEqual(new_conn.recv(100), msg.upper()) 1892 else: 1893 # XXX On Windows with Py2.6 need to backport fromfd() 1894 discard = lconn.recv_bytes() 1895 1896 lconn.send(None) 1897 1898 rconn.close() 1899 lconn.close() 1900 1901 lp.join() 1902 rp.join() 1903""" 1904# 1905# 1906# 1907 1908class _TestHeap(BaseTestCase): 1909 1910 ALLOWED_TYPES = ('processes',) 1911 1912 def test_heap(self): 1913 iterations = 5000 1914 maxblocks = 50 1915 blocks = [] 1916 1917 # create and destroy lots of blocks of different sizes 1918 for i in xrange(iterations): 1919 size = int(random.lognormvariate(0, 1) * 1000) 1920 b = multiprocessing.heap.BufferWrapper(size) 1921 blocks.append(b) 1922 if len(blocks) > maxblocks: 1923 i = random.randrange(maxblocks) 1924 del blocks[i] 1925 1926 # get the heap object 1927 heap = multiprocessing.heap.BufferWrapper._heap 1928 1929 # verify the state of the heap 1930 all = [] 1931 occupied = 0 1932 heap._lock.acquire() 1933 self.addCleanup(heap._lock.release) 1934 for L in heap._len_to_seq.values(): 1935 for arena, start, stop in L: 1936 all.append((heap._arenas.index(arena), start, stop, 1937 stop-start, 'free')) 1938 for arena, start, stop in heap._allocated_blocks: 1939 all.append((heap._arenas.index(arena), start, stop, 1940 stop-start, 'occupied')) 1941 occupied += (stop-start) 1942 1943 all.sort() 1944 1945 for i in range(len(all)-1): 1946 (arena, start, stop) = all[i][:3] 1947 (narena, nstart, nstop) = all[i+1][:3] 1948 self.assertTrue((arena != narena and nstart == 0) or 1949 (stop == nstart)) 1950 1951 def test_free_from_gc(self): 1952 # Check that freeing of blocks by the garbage collector doesn't deadlock 1953 # (issue #12352). 1954 # Make sure the GC is enabled, and set lower collection thresholds to 1955 # make collections more frequent (and increase the probability of 1956 # deadlock). 1957 if not gc.isenabled(): 1958 gc.enable() 1959 self.addCleanup(gc.disable) 1960 thresholds = gc.get_threshold() 1961 self.addCleanup(gc.set_threshold, *thresholds) 1962 gc.set_threshold(10) 1963 1964 # perform numerous block allocations, with cyclic references to make 1965 # sure objects are collected asynchronously by the gc 1966 for i in range(5000): 1967 a = multiprocessing.heap.BufferWrapper(1) 1968 b = multiprocessing.heap.BufferWrapper(1) 1969 # circular references 1970 a.buddy = b 1971 b.buddy = a 1972 1973# 1974# 1975# 1976 1977class _Foo(Structure): 1978 _fields_ = [ 1979 ('x', c_int), 1980 ('y', c_double) 1981 ] 1982 1983class _TestSharedCTypes(BaseTestCase): 1984 1985 ALLOWED_TYPES = ('processes',) 1986 1987 def setUp(self): 1988 if not HAS_SHAREDCTYPES: 1989 self.skipTest("requires multiprocessing.sharedctypes") 1990 1991 @classmethod 1992 def _double(cls, x, y, foo, arr, string): 1993 x.value *= 2 1994 y.value *= 2 1995 foo.x *= 2 1996 foo.y *= 2 1997 string.value *= 2 1998 for i in range(len(arr)): 1999 arr[i] *= 2 2000 2001 def test_sharedctypes(self, lock=False): 2002 x = Value('i', 7, lock=lock) 2003 y = Value(c_double, 1.0/3.0, lock=lock) 2004 foo = Value(_Foo, 3, 2, lock=lock) 2005 arr = self.Array('d', range(10), lock=lock) 2006 string = self.Array('c', 20, lock=lock) 2007 string.value = latin('hello') 2008 2009 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 2010 p.daemon = True 2011 p.start() 2012 p.join() 2013 2014 self.assertEqual(x.value, 14) 2015 self.assertAlmostEqual(y.value, 2.0/3.0) 2016 self.assertEqual(foo.x, 6) 2017 self.assertAlmostEqual(foo.y, 4.0) 2018 for i in range(10): 2019 self.assertAlmostEqual(arr[i], i*2) 2020 self.assertEqual(string.value, latin('hellohello')) 2021 2022 def test_synchronize(self): 2023 self.test_sharedctypes(lock=True) 2024 2025 def test_copy(self): 2026 foo = _Foo(2, 5.0) 2027 bar = copy(foo) 2028 foo.x = 0 2029 foo.y = 0 2030 self.assertEqual(bar.x, 2) 2031 self.assertAlmostEqual(bar.y, 5.0) 2032 2033# 2034# 2035# 2036 2037class _TestFinalize(BaseTestCase): 2038 2039 ALLOWED_TYPES = ('processes',) 2040 2041 @classmethod 2042 def _test_finalize(cls, conn): 2043 class Foo(object): 2044 pass 2045 2046 a = Foo() 2047 util.Finalize(a, conn.send, args=('a',)) 2048 del a # triggers callback for a 2049 2050 b = Foo() 2051 close_b = util.Finalize(b, conn.send, args=('b',)) 2052 close_b() # triggers callback for b 2053 close_b() # does nothing because callback has already been called 2054 del b # does nothing because callback has already been called 2055 2056 c = Foo() 2057 util.Finalize(c, conn.send, args=('c',)) 2058 2059 d10 = Foo() 2060 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 2061 2062 d01 = Foo() 2063 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 2064 d02 = Foo() 2065 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 2066 d03 = Foo() 2067 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 2068 2069 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 2070 2071 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 2072 2073 # call multiprocessing's cleanup function then exit process without 2074 # garbage collecting locals 2075 util._exit_function() 2076 conn.close() 2077 os._exit(0) 2078 2079 def test_finalize(self): 2080 conn, child_conn = self.Pipe() 2081 2082 p = self.Process(target=self._test_finalize, args=(child_conn,)) 2083 p.daemon = True 2084 p.start() 2085 p.join() 2086 2087 result = [obj for obj in iter(conn.recv, 'STOP')] 2088 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 2089 2090# 2091# Test that from ... import * works for each module 2092# 2093 2094class _TestImportStar(BaseTestCase): 2095 2096 ALLOWED_TYPES = ('processes',) 2097 2098 def test_import(self): 2099 modules = [ 2100 'multiprocessing', 'multiprocessing.connection', 2101 'multiprocessing.heap', 'multiprocessing.managers', 2102 'multiprocessing.pool', 'multiprocessing.process', 2103 'multiprocessing.synchronize', 'multiprocessing.util' 2104 ] 2105 2106 if HAS_REDUCTION: 2107 modules.append('multiprocessing.reduction') 2108 2109 if c_int is not None: 2110 # This module requires _ctypes 2111 modules.append('multiprocessing.sharedctypes') 2112 2113 for name in modules: 2114 __import__(name) 2115 mod = sys.modules[name] 2116 2117 for attr in getattr(mod, '__all__', ()): 2118 self.assertTrue( 2119 hasattr(mod, attr), 2120 '%r does not have attribute %r' % (mod, attr) 2121 ) 2122 2123# 2124# Quick test that logging works -- does not test logging output 2125# 2126 2127class _TestLogging(BaseTestCase): 2128 2129 ALLOWED_TYPES = ('processes',) 2130 2131 def test_enable_logging(self): 2132 logger = multiprocessing.get_logger() 2133 logger.setLevel(util.SUBWARNING) 2134 self.assertTrue(logger is not None) 2135 logger.debug('this will not be printed') 2136 logger.info('nor will this') 2137 logger.setLevel(LOG_LEVEL) 2138 2139 @classmethod 2140 def _test_level(cls, conn): 2141 logger = multiprocessing.get_logger() 2142 conn.send(logger.getEffectiveLevel()) 2143 2144 def test_level(self): 2145 LEVEL1 = 32 2146 LEVEL2 = 37 2147 2148 logger = multiprocessing.get_logger() 2149 root_logger = logging.getLogger() 2150 root_level = root_logger.level 2151 2152 reader, writer = multiprocessing.Pipe(duplex=False) 2153 2154 logger.setLevel(LEVEL1) 2155 p = self.Process(target=self._test_level, args=(writer,)) 2156 p.daemon = True 2157 p.start() 2158 self.assertEqual(LEVEL1, reader.recv()) 2159 2160 logger.setLevel(logging.NOTSET) 2161 root_logger.setLevel(LEVEL2) 2162 p = self.Process(target=self._test_level, args=(writer,)) 2163 p.daemon = True 2164 p.start() 2165 self.assertEqual(LEVEL2, reader.recv()) 2166 2167 root_logger.setLevel(root_level) 2168 logger.setLevel(level=LOG_LEVEL) 2169 2170 2171# class _TestLoggingProcessName(BaseTestCase): 2172# 2173# def handle(self, record): 2174# assert record.processName == multiprocessing.current_process().name 2175# self.__handled = True 2176# 2177# def test_logging(self): 2178# handler = logging.Handler() 2179# handler.handle = self.handle 2180# self.__handled = False 2181# # Bypass getLogger() and side-effects 2182# logger = logging.getLoggerClass()( 2183# 'multiprocessing.test.TestLoggingProcessName') 2184# logger.addHandler(handler) 2185# logger.propagate = False 2186# 2187# logger.warn('foo') 2188# assert self.__handled 2189 2190# 2191# Check that Process.join() retries if os.waitpid() fails with EINTR 2192# 2193 2194class _TestPollEintr(BaseTestCase): 2195 2196 ALLOWED_TYPES = ('processes',) 2197 2198 @classmethod 2199 def _killer(cls, pid): 2200 time.sleep(0.5) 2201 os.kill(pid, signal.SIGUSR1) 2202 2203 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2204 def test_poll_eintr(self): 2205 got_signal = [False] 2206 def record(*args): 2207 got_signal[0] = True 2208 pid = os.getpid() 2209 oldhandler = signal.signal(signal.SIGUSR1, record) 2210 try: 2211 killer = self.Process(target=self._killer, args=(pid,)) 2212 killer.start() 2213 p = self.Process(target=time.sleep, args=(1,)) 2214 p.start() 2215 p.join() 2216 self.assertTrue(got_signal[0]) 2217 self.assertEqual(p.exitcode, 0) 2218 killer.join() 2219 finally: 2220 signal.signal(signal.SIGUSR1, oldhandler) 2221 2222# 2223# Test to verify handle verification, see issue 3321 2224# 2225 2226class TestInvalidHandle(unittest.TestCase): 2227 2228 @unittest.skipIf(WIN32, "skipped on Windows") 2229 def test_invalid_handles(self): 2230 conn = _multiprocessing.Connection(44977608) 2231 self.assertRaises(IOError, conn.poll) 2232 self.assertRaises(IOError, _multiprocessing.Connection, -1) 2233 2234# 2235# Functions used to create test cases from the base ones in this module 2236# 2237 2238def get_attributes(Source, names): 2239 d = {} 2240 for name in names: 2241 obj = getattr(Source, name) 2242 if type(obj) == type(get_attributes): 2243 obj = staticmethod(obj) 2244 d[name] = obj 2245 return d 2246 2247def create_test_cases(Mixin, type): 2248 result = {} 2249 glob = globals() 2250 Type = type.capitalize() 2251 2252 for name in glob.keys(): 2253 if name.startswith('_Test'): 2254 base = glob[name] 2255 if type in base.ALLOWED_TYPES: 2256 newname = 'With' + Type + name[1:] 2257 class Temp(base, unittest.TestCase, Mixin): 2258 pass 2259 result[newname] = Temp 2260 Temp.__name__ = newname 2261 Temp.__module__ = Mixin.__module__ 2262 return result 2263 2264# 2265# Create test cases 2266# 2267 2268class ProcessesMixin(object): 2269 TYPE = 'processes' 2270 Process = multiprocessing.Process 2271 locals().update(get_attributes(multiprocessing, ( 2272 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2273 'Condition', 'Event', 'Value', 'Array', 'RawValue', 2274 'RawArray', 'current_process', 'active_children', 'Pipe', 2275 'connection', 'JoinableQueue', 'Pool' 2276 ))) 2277 2278testcases_processes = create_test_cases(ProcessesMixin, type='processes') 2279globals().update(testcases_processes) 2280 2281 2282class ManagerMixin(object): 2283 TYPE = 'manager' 2284 Process = multiprocessing.Process 2285 manager = object.__new__(multiprocessing.managers.SyncManager) 2286 locals().update(get_attributes(manager, ( 2287 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2288 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', 2289 'Namespace', 'JoinableQueue', 'Pool' 2290 ))) 2291 2292testcases_manager = create_test_cases(ManagerMixin, type='manager') 2293globals().update(testcases_manager) 2294 2295 2296class ThreadsMixin(object): 2297 TYPE = 'threads' 2298 Process = multiprocessing.dummy.Process 2299 locals().update(get_attributes(multiprocessing.dummy, ( 2300 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2301 'Condition', 'Event', 'Value', 'Array', 'current_process', 2302 'active_children', 'Pipe', 'connection', 'dict', 'list', 2303 'Namespace', 'JoinableQueue', 'Pool' 2304 ))) 2305 2306testcases_threads = create_test_cases(ThreadsMixin, type='threads') 2307globals().update(testcases_threads) 2308 2309class OtherTest(unittest.TestCase): 2310 # TODO: add more tests for deliver/answer challenge. 2311 def test_deliver_challenge_auth_failure(self): 2312 class _FakeConnection(object): 2313 def recv_bytes(self, size): 2314 return b'something bogus' 2315 def send_bytes(self, data): 2316 pass 2317 self.assertRaises(multiprocessing.AuthenticationError, 2318 multiprocessing.connection.deliver_challenge, 2319 _FakeConnection(), b'abc') 2320 2321 def test_answer_challenge_auth_failure(self): 2322 class _FakeConnection(object): 2323 def __init__(self): 2324 self.count = 0 2325 def recv_bytes(self, size): 2326 self.count += 1 2327 if self.count == 1: 2328 return multiprocessing.connection.CHALLENGE 2329 elif self.count == 2: 2330 return b'something bogus' 2331 return b'' 2332 def send_bytes(self, data): 2333 pass 2334 self.assertRaises(multiprocessing.AuthenticationError, 2335 multiprocessing.connection.answer_challenge, 2336 _FakeConnection(), b'abc') 2337 2338# 2339# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 2340# 2341 2342def initializer(ns): 2343 ns.test += 1 2344 2345class TestInitializers(unittest.TestCase): 2346 def setUp(self): 2347 self.mgr = multiprocessing.Manager() 2348 self.ns = self.mgr.Namespace() 2349 self.ns.test = 0 2350 2351 def tearDown(self): 2352 self.mgr.shutdown() 2353 2354 def test_manager_initializer(self): 2355 m = multiprocessing.managers.SyncManager() 2356 self.assertRaises(TypeError, m.start, 1) 2357 m.start(initializer, (self.ns,)) 2358 self.assertEqual(self.ns.test, 1) 2359 m.shutdown() 2360 2361 def test_pool_initializer(self): 2362 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 2363 p = multiprocessing.Pool(1, initializer, (self.ns,)) 2364 p.close() 2365 p.join() 2366 self.assertEqual(self.ns.test, 1) 2367 2368# 2369# Issue 5155, 5313, 5331: Test process in processes 2370# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 2371# 2372 2373def _this_sub_process(q): 2374 try: 2375 item = q.get(block=False) 2376 except Queue.Empty: 2377 pass 2378 2379def _test_process(q): 2380 queue = multiprocessing.Queue() 2381 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 2382 subProc.daemon = True 2383 subProc.start() 2384 subProc.join() 2385 2386def _afunc(x): 2387 return x*x 2388 2389def pool_in_process(): 2390 pool = multiprocessing.Pool(processes=4) 2391 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 2392 2393class _file_like(object): 2394 def __init__(self, delegate): 2395 self._delegate = delegate 2396 self._pid = None 2397 2398 @property 2399 def cache(self): 2400 pid = os.getpid() 2401 # There are no race conditions since fork keeps only the running thread 2402 if pid != self._pid: 2403 self._pid = pid 2404 self._cache = [] 2405 return self._cache 2406 2407 def write(self, data): 2408 self.cache.append(data) 2409 2410 def flush(self): 2411 self._delegate.write(''.join(self.cache)) 2412 self._cache = [] 2413 2414class TestStdinBadfiledescriptor(unittest.TestCase): 2415 2416 def test_queue_in_process(self): 2417 queue = multiprocessing.Queue() 2418 proc = multiprocessing.Process(target=_test_process, args=(queue,)) 2419 proc.start() 2420 proc.join() 2421 2422 def test_pool_in_process(self): 2423 p = multiprocessing.Process(target=pool_in_process) 2424 p.start() 2425 p.join() 2426 2427 def test_flushing(self): 2428 sio = StringIO() 2429 flike = _file_like(sio) 2430 flike.write('foo') 2431 proc = multiprocessing.Process(target=lambda: flike.flush()) 2432 flike.flush() 2433 assert sio.getvalue() == 'foo' 2434 2435# 2436# Test interaction with socket timeouts - see Issue #6056 2437# 2438 2439class TestTimeouts(unittest.TestCase): 2440 @classmethod 2441 def _test_timeout(cls, child, address): 2442 time.sleep(1) 2443 child.send(123) 2444 child.close() 2445 conn = multiprocessing.connection.Client(address) 2446 conn.send(456) 2447 conn.close() 2448 2449 def test_timeout(self): 2450 old_timeout = socket.getdefaulttimeout() 2451 try: 2452 socket.setdefaulttimeout(0.1) 2453 parent, child = multiprocessing.Pipe(duplex=True) 2454 l = multiprocessing.connection.Listener(family='AF_INET') 2455 p = multiprocessing.Process(target=self._test_timeout, 2456 args=(child, l.address)) 2457 p.start() 2458 child.close() 2459 self.assertEqual(parent.recv(), 123) 2460 parent.close() 2461 conn = l.accept() 2462 self.assertEqual(conn.recv(), 456) 2463 conn.close() 2464 l.close() 2465 p.join(10) 2466 finally: 2467 socket.setdefaulttimeout(old_timeout) 2468 2469# 2470# Test what happens with no "if __name__ == '__main__'" 2471# 2472 2473class TestNoForkBomb(unittest.TestCase): 2474 def test_noforkbomb(self): 2475 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 2476 if WIN32: 2477 rc, out, err = test.script_helper.assert_python_failure(name) 2478 self.assertEqual(out, '') 2479 self.assertIn('RuntimeError', err) 2480 else: 2481 rc, out, err = test.script_helper.assert_python_ok(name) 2482 self.assertEqual(out.rstrip(), '123') 2483 self.assertEqual(err, '') 2484 2485# 2486# Issue 12098: check sys.flags of child matches that for parent 2487# 2488 2489class TestFlags(unittest.TestCase): 2490 @classmethod 2491 def run_in_grandchild(cls, conn): 2492 conn.send(tuple(sys.flags)) 2493 2494 @classmethod 2495 def run_in_child(cls): 2496 import json 2497 r, w = multiprocessing.Pipe(duplex=False) 2498 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 2499 p.start() 2500 grandchild_flags = r.recv() 2501 p.join() 2502 r.close() 2503 w.close() 2504 flags = (tuple(sys.flags), grandchild_flags) 2505 print(json.dumps(flags)) 2506 2507 @test_support.requires_unicode # XXX json needs unicode support 2508 def test_flags(self): 2509 import json, subprocess 2510 # start child process using unusual flags 2511 prog = ('from test.test_multiprocessing import TestFlags; ' + 2512 'TestFlags.run_in_child()') 2513 data = subprocess.check_output( 2514 [sys.executable, '-E', '-B', '-O', '-c', prog]) 2515 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 2516 self.assertEqual(child_flags, grandchild_flags) 2517 2518# 2519# Issue #17555: ForkAwareThreadLock 2520# 2521 2522class TestForkAwareThreadLock(unittest.TestCase): 2523 # We recurisvely start processes. Issue #17555 meant that the 2524 # after fork registry would get duplicate entries for the same 2525 # lock. The size of the registry at generation n was ~2**n. 2526 2527 @classmethod 2528 def child(cls, n, conn): 2529 if n > 1: 2530 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 2531 p.start() 2532 p.join() 2533 else: 2534 conn.send(len(util._afterfork_registry)) 2535 conn.close() 2536 2537 def test_lock(self): 2538 r, w = multiprocessing.Pipe(False) 2539 l = util.ForkAwareThreadLock() 2540 old_size = len(util._afterfork_registry) 2541 p = multiprocessing.Process(target=self.child, args=(5, w)) 2542 p.start() 2543 new_size = r.recv() 2544 p.join() 2545 self.assertLessEqual(new_size, old_size) 2546 2547# 2548# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 2549# 2550 2551class TestIgnoreEINTR(unittest.TestCase): 2552 2553 @classmethod 2554 def _test_ignore(cls, conn): 2555 def handler(signum, frame): 2556 pass 2557 signal.signal(signal.SIGUSR1, handler) 2558 conn.send('ready') 2559 x = conn.recv() 2560 conn.send(x) 2561 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block 2562 2563 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2564 def test_ignore(self): 2565 conn, child_conn = multiprocessing.Pipe() 2566 try: 2567 p = multiprocessing.Process(target=self._test_ignore, 2568 args=(child_conn,)) 2569 p.daemon = True 2570 p.start() 2571 child_conn.close() 2572 self.assertEqual(conn.recv(), 'ready') 2573 time.sleep(0.1) 2574 os.kill(p.pid, signal.SIGUSR1) 2575 time.sleep(0.1) 2576 conn.send(1234) 2577 self.assertEqual(conn.recv(), 1234) 2578 time.sleep(0.1) 2579 os.kill(p.pid, signal.SIGUSR1) 2580 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) 2581 time.sleep(0.1) 2582 p.join() 2583 finally: 2584 conn.close() 2585 2586 @classmethod 2587 def _test_ignore_listener(cls, conn): 2588 def handler(signum, frame): 2589 pass 2590 signal.signal(signal.SIGUSR1, handler) 2591 l = multiprocessing.connection.Listener() 2592 conn.send(l.address) 2593 a = l.accept() 2594 a.send('welcome') 2595 2596 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2597 def test_ignore_listener(self): 2598 conn, child_conn = multiprocessing.Pipe() 2599 try: 2600 p = multiprocessing.Process(target=self._test_ignore_listener, 2601 args=(child_conn,)) 2602 p.daemon = True 2603 p.start() 2604 child_conn.close() 2605 address = conn.recv() 2606 time.sleep(0.1) 2607 os.kill(p.pid, signal.SIGUSR1) 2608 time.sleep(0.1) 2609 client = multiprocessing.connection.Client(address) 2610 self.assertEqual(client.recv(), 'welcome') 2611 p.join() 2612 finally: 2613 conn.close() 2614 2615# 2616# 2617# 2618 2619testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, 2620 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb, 2621 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR] 2622 2623# 2624# 2625# 2626 2627def test_main(run=None): 2628 if sys.platform.startswith("linux"): 2629 try: 2630 lock = multiprocessing.RLock() 2631 except OSError: 2632 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") 2633 2634 check_enough_semaphores() 2635 2636 if run is None: 2637 from test.test_support import run_unittest as run 2638 2639 util.get_temp_dir() # creates temp directory for use by all processes 2640 2641 multiprocessing.get_logger().setLevel(LOG_LEVEL) 2642 2643 ProcessesMixin.pool = multiprocessing.Pool(4) 2644 ThreadsMixin.pool = multiprocessing.dummy.Pool(4) 2645 ManagerMixin.manager.__init__() 2646 ManagerMixin.manager.start() 2647 ManagerMixin.pool = ManagerMixin.manager.Pool(4) 2648 2649 testcases = ( 2650 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + 2651 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + 2652 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + 2653 testcases_other 2654 ) 2655 2656 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase 2657 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) 2658 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading 2659 # module during these tests is at least platform dependent and possibly 2660 # non-deterministic on any given platform. So we don't mind if the listed 2661 # warnings aren't actually raised. 2662 with test_support.check_py3k_warnings( 2663 (".+__(get|set)slice__ has been removed", DeprecationWarning), 2664 (r"sys.exc_clear\(\) not supported", DeprecationWarning), 2665 quiet=True): 2666 run(suite) 2667 2668 ThreadsMixin.pool.terminate() 2669 ProcessesMixin.pool.terminate() 2670 ManagerMixin.pool.terminate() 2671 ManagerMixin.manager.shutdown() 2672 2673 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool 2674 2675def main(): 2676 test_main(unittest.TextTestRunner(verbosity=2).run) 2677 2678if __name__ == '__main__': 2679 main() 2680