• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
13import socket
14import random
15import logging
16import errno
17import test.script_helper
18from test import test_support
19from StringIO import StringIO
20_multiprocessing = test_support.import_module('_multiprocessing')
21# import threading after _multiprocessing to raise a more relevant error
22# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
25
26# Work around broken sem_open implementations
27test_support.import_module('multiprocessing.synchronize')
28
29import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
33import multiprocessing.pool
34
35from multiprocessing import util
36
37try:
38    from multiprocessing import reduction
39    HAS_REDUCTION = True
40except ImportError:
41    HAS_REDUCTION = False
42
43try:
44    from multiprocessing.sharedctypes import Value, copy
45    HAS_SHAREDCTYPES = True
46except ImportError:
47    HAS_SHAREDCTYPES = False
48
49try:
50    import msvcrt
51except ImportError:
52    msvcrt = None
53
54#
55#
56#
57
58latin = str
59
60#
61# Constants
62#
63
64LOG_LEVEL = util.SUBWARNING
65#LOG_LEVEL = logging.DEBUG
66
67DELTA = 0.1
68CHECK_TIMINGS = False     # making true makes tests take a lot longer
69                          # and can sometimes cause some non-serious
70                          # failures because some calls block a bit
71                          # longer than expected
72if CHECK_TIMINGS:
73    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
74else:
75    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
76
77HAVE_GETVALUE = not getattr(_multiprocessing,
78                            'HAVE_BROKEN_SEM_GETVALUE', False)
79
80WIN32 = (sys.platform == "win32")
81
82try:
83    MAXFD = os.sysconf("SC_OPEN_MAX")
84except:
85    MAXFD = 256
86
87#
88# Some tests require ctypes
89#
90
91try:
92    from ctypes import Structure, c_int, c_double
93except ImportError:
94    Structure = object
95    c_int = c_double = None
96
97
98def check_enough_semaphores():
99    """Check that the system supports enough semaphores to run the test."""
100    # minimum number of semaphores available according to POSIX
101    nsems_min = 256
102    try:
103        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
104    except (AttributeError, ValueError):
105        # sysconf not available or setting not available
106        return
107    if nsems == -1 or nsems >= nsems_min:
108        return
109    raise unittest.SkipTest("The OS doesn't support enough semaphores "
110                            "to run the test (required: %d)." % nsems_min)
111
112
113#
114# Creates a wrapper for a function which records the time it takes to finish
115#
116
117class TimingWrapper(object):
118
119    def __init__(self, func):
120        self.func = func
121        self.elapsed = None
122
123    def __call__(self, *args, **kwds):
124        t = time.time()
125        try:
126            return self.func(*args, **kwds)
127        finally:
128            self.elapsed = time.time() - t
129
130#
131# Base class for test cases
132#
133
134class BaseTestCase(object):
135
136    ALLOWED_TYPES = ('processes', 'manager', 'threads')
137
138    def assertTimingAlmostEqual(self, a, b):
139        if CHECK_TIMINGS:
140            self.assertAlmostEqual(a, b, 1)
141
142    def assertReturnsIfImplemented(self, value, func, *args):
143        try:
144            res = func(*args)
145        except NotImplementedError:
146            pass
147        else:
148            return self.assertEqual(value, res)
149
150    # For the sanity of Windows users, rather than crashing or freezing in
151    # multiple ways.
152    def __reduce__(self, *args):
153        raise NotImplementedError("shouldn't try to pickle a test case")
154
155    __reduce_ex__ = __reduce__
156
157#
158# Return the value of a semaphore
159#
160
161def get_value(self):
162    try:
163        return self.get_value()
164    except AttributeError:
165        try:
166            return self._Semaphore__value
167        except AttributeError:
168            try:
169                return self._value
170            except AttributeError:
171                raise NotImplementedError
172
173#
174# Testcases
175#
176
177class _TestProcess(BaseTestCase):
178
179    ALLOWED_TYPES = ('processes', 'threads')
180
181    def test_current(self):
182        if self.TYPE == 'threads':
183            self.skipTest('test not appropriate for {}'.format(self.TYPE))
184
185        current = self.current_process()
186        authkey = current.authkey
187
188        self.assertTrue(current.is_alive())
189        self.assertTrue(not current.daemon)
190        self.assertIsInstance(authkey, bytes)
191        self.assertTrue(len(authkey) > 0)
192        self.assertEqual(current.ident, os.getpid())
193        self.assertEqual(current.exitcode, None)
194
195    @classmethod
196    def _test(cls, q, *args, **kwds):
197        current = cls.current_process()
198        q.put(args)
199        q.put(kwds)
200        q.put(current.name)
201        if cls.TYPE != 'threads':
202            q.put(bytes(current.authkey))
203            q.put(current.pid)
204
205    def test_process(self):
206        q = self.Queue(1)
207        e = self.Event()
208        args = (q, 1, 2)
209        kwargs = {'hello':23, 'bye':2.54}
210        name = 'SomeProcess'
211        p = self.Process(
212            target=self._test, args=args, kwargs=kwargs, name=name
213            )
214        p.daemon = True
215        current = self.current_process()
216
217        if self.TYPE != 'threads':
218            self.assertEqual(p.authkey, current.authkey)
219        self.assertEqual(p.is_alive(), False)
220        self.assertEqual(p.daemon, True)
221        self.assertNotIn(p, self.active_children())
222        self.assertTrue(type(self.active_children()) is list)
223        self.assertEqual(p.exitcode, None)
224
225        p.start()
226
227        self.assertEqual(p.exitcode, None)
228        self.assertEqual(p.is_alive(), True)
229        self.assertIn(p, self.active_children())
230
231        self.assertEqual(q.get(), args[1:])
232        self.assertEqual(q.get(), kwargs)
233        self.assertEqual(q.get(), p.name)
234        if self.TYPE != 'threads':
235            self.assertEqual(q.get(), current.authkey)
236            self.assertEqual(q.get(), p.pid)
237
238        p.join()
239
240        self.assertEqual(p.exitcode, 0)
241        self.assertEqual(p.is_alive(), False)
242        self.assertNotIn(p, self.active_children())
243
244    @classmethod
245    def _test_terminate(cls):
246        time.sleep(1000)
247
248    def test_terminate(self):
249        if self.TYPE == 'threads':
250            self.skipTest('test not appropriate for {}'.format(self.TYPE))
251
252        p = self.Process(target=self._test_terminate)
253        p.daemon = True
254        p.start()
255
256        self.assertEqual(p.is_alive(), True)
257        self.assertIn(p, self.active_children())
258        self.assertEqual(p.exitcode, None)
259
260        p.terminate()
261
262        join = TimingWrapper(p.join)
263        self.assertEqual(join(), None)
264        self.assertTimingAlmostEqual(join.elapsed, 0.0)
265
266        self.assertEqual(p.is_alive(), False)
267        self.assertNotIn(p, self.active_children())
268
269        p.join()
270
271        # XXX sometimes get p.exitcode == 0 on Windows ...
272        #self.assertEqual(p.exitcode, -signal.SIGTERM)
273
274    def test_cpu_count(self):
275        try:
276            cpus = multiprocessing.cpu_count()
277        except NotImplementedError:
278            cpus = 1
279        self.assertTrue(type(cpus) is int)
280        self.assertTrue(cpus >= 1)
281
282    def test_active_children(self):
283        self.assertEqual(type(self.active_children()), list)
284
285        p = self.Process(target=time.sleep, args=(DELTA,))
286        self.assertNotIn(p, self.active_children())
287
288        p.daemon = True
289        p.start()
290        self.assertIn(p, self.active_children())
291
292        p.join()
293        self.assertNotIn(p, self.active_children())
294
295    @classmethod
296    def _test_recursion(cls, wconn, id):
297        from multiprocessing import forking
298        wconn.send(id)
299        if len(id) < 2:
300            for i in range(2):
301                p = cls.Process(
302                    target=cls._test_recursion, args=(wconn, id+[i])
303                    )
304                p.start()
305                p.join()
306
307    def test_recursion(self):
308        rconn, wconn = self.Pipe(duplex=False)
309        self._test_recursion(wconn, [])
310
311        time.sleep(DELTA)
312        result = []
313        while rconn.poll():
314            result.append(rconn.recv())
315
316        expected = [
317            [],
318              [0],
319                [0, 0],
320                [0, 1],
321              [1],
322                [1, 0],
323                [1, 1]
324            ]
325        self.assertEqual(result, expected)
326
327    @classmethod
328    def _test_sys_exit(cls, reason, testfn):
329        sys.stderr = open(testfn, 'w')
330        sys.exit(reason)
331
332    def test_sys_exit(self):
333        # See Issue 13854
334        if self.TYPE == 'threads':
335            self.skipTest('test not appropriate for {}'.format(self.TYPE))
336
337        testfn = test_support.TESTFN
338        self.addCleanup(test_support.unlink, testfn)
339
340        for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
341            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
342            p.daemon = True
343            p.start()
344            p.join(5)
345            self.assertEqual(p.exitcode, code)
346
347            with open(testfn, 'r') as f:
348                self.assertEqual(f.read().rstrip(), str(reason))
349
350        for reason in (True, False, 8):
351            p = self.Process(target=sys.exit, args=(reason,))
352            p.daemon = True
353            p.start()
354            p.join(5)
355            self.assertEqual(p.exitcode, reason)
356
357#
358#
359#
360
361class _UpperCaser(multiprocessing.Process):
362
363    def __init__(self):
364        multiprocessing.Process.__init__(self)
365        self.child_conn, self.parent_conn = multiprocessing.Pipe()
366
367    def run(self):
368        self.parent_conn.close()
369        for s in iter(self.child_conn.recv, None):
370            self.child_conn.send(s.upper())
371        self.child_conn.close()
372
373    def submit(self, s):
374        assert type(s) is str
375        self.parent_conn.send(s)
376        return self.parent_conn.recv()
377
378    def stop(self):
379        self.parent_conn.send(None)
380        self.parent_conn.close()
381        self.child_conn.close()
382
383class _TestSubclassingProcess(BaseTestCase):
384
385    ALLOWED_TYPES = ('processes',)
386
387    def test_subclassing(self):
388        uppercaser = _UpperCaser()
389        uppercaser.daemon = True
390        uppercaser.start()
391        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
392        self.assertEqual(uppercaser.submit('world'), 'WORLD')
393        uppercaser.stop()
394        uppercaser.join()
395
396#
397#
398#
399
400def queue_empty(q):
401    if hasattr(q, 'empty'):
402        return q.empty()
403    else:
404        return q.qsize() == 0
405
406def queue_full(q, maxsize):
407    if hasattr(q, 'full'):
408        return q.full()
409    else:
410        return q.qsize() == maxsize
411
412
413class _TestQueue(BaseTestCase):
414
415
416    @classmethod
417    def _test_put(cls, queue, child_can_start, parent_can_continue):
418        child_can_start.wait()
419        for i in range(6):
420            queue.get()
421        parent_can_continue.set()
422
423    def test_put(self):
424        MAXSIZE = 6
425        queue = self.Queue(maxsize=MAXSIZE)
426        child_can_start = self.Event()
427        parent_can_continue = self.Event()
428
429        proc = self.Process(
430            target=self._test_put,
431            args=(queue, child_can_start, parent_can_continue)
432            )
433        proc.daemon = True
434        proc.start()
435
436        self.assertEqual(queue_empty(queue), True)
437        self.assertEqual(queue_full(queue, MAXSIZE), False)
438
439        queue.put(1)
440        queue.put(2, True)
441        queue.put(3, True, None)
442        queue.put(4, False)
443        queue.put(5, False, None)
444        queue.put_nowait(6)
445
446        # the values may be in buffer but not yet in pipe so sleep a bit
447        time.sleep(DELTA)
448
449        self.assertEqual(queue_empty(queue), False)
450        self.assertEqual(queue_full(queue, MAXSIZE), True)
451
452        put = TimingWrapper(queue.put)
453        put_nowait = TimingWrapper(queue.put_nowait)
454
455        self.assertRaises(Queue.Full, put, 7, False)
456        self.assertTimingAlmostEqual(put.elapsed, 0)
457
458        self.assertRaises(Queue.Full, put, 7, False, None)
459        self.assertTimingAlmostEqual(put.elapsed, 0)
460
461        self.assertRaises(Queue.Full, put_nowait, 7)
462        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
463
464        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
465        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
466
467        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
468        self.assertTimingAlmostEqual(put.elapsed, 0)
469
470        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
471        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
472
473        child_can_start.set()
474        parent_can_continue.wait()
475
476        self.assertEqual(queue_empty(queue), True)
477        self.assertEqual(queue_full(queue, MAXSIZE), False)
478
479        proc.join()
480
481    @classmethod
482    def _test_get(cls, queue, child_can_start, parent_can_continue):
483        child_can_start.wait()
484        #queue.put(1)
485        queue.put(2)
486        queue.put(3)
487        queue.put(4)
488        queue.put(5)
489        parent_can_continue.set()
490
491    def test_get(self):
492        queue = self.Queue()
493        child_can_start = self.Event()
494        parent_can_continue = self.Event()
495
496        proc = self.Process(
497            target=self._test_get,
498            args=(queue, child_can_start, parent_can_continue)
499            )
500        proc.daemon = True
501        proc.start()
502
503        self.assertEqual(queue_empty(queue), True)
504
505        child_can_start.set()
506        parent_can_continue.wait()
507
508        time.sleep(DELTA)
509        self.assertEqual(queue_empty(queue), False)
510
511        # Hangs unexpectedly, remove for now
512        #self.assertEqual(queue.get(), 1)
513        self.assertEqual(queue.get(True, None), 2)
514        self.assertEqual(queue.get(True), 3)
515        self.assertEqual(queue.get(timeout=1), 4)
516        self.assertEqual(queue.get_nowait(), 5)
517
518        self.assertEqual(queue_empty(queue), True)
519
520        get = TimingWrapper(queue.get)
521        get_nowait = TimingWrapper(queue.get_nowait)
522
523        self.assertRaises(Queue.Empty, get, False)
524        self.assertTimingAlmostEqual(get.elapsed, 0)
525
526        self.assertRaises(Queue.Empty, get, False, None)
527        self.assertTimingAlmostEqual(get.elapsed, 0)
528
529        self.assertRaises(Queue.Empty, get_nowait)
530        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
531
532        self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
533        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
534
535        self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
536        self.assertTimingAlmostEqual(get.elapsed, 0)
537
538        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
539        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
540
541        proc.join()
542
543    @classmethod
544    def _test_fork(cls, queue):
545        for i in range(10, 20):
546            queue.put(i)
547        # note that at this point the items may only be buffered, so the
548        # process cannot shutdown until the feeder thread has finished
549        # pushing items onto the pipe.
550
551    def test_fork(self):
552        # Old versions of Queue would fail to create a new feeder
553        # thread for a forked process if the original process had its
554        # own feeder thread.  This test checks that this no longer
555        # happens.
556
557        queue = self.Queue()
558
559        # put items on queue so that main process starts a feeder thread
560        for i in range(10):
561            queue.put(i)
562
563        # wait to make sure thread starts before we fork a new process
564        time.sleep(DELTA)
565
566        # fork process
567        p = self.Process(target=self._test_fork, args=(queue,))
568        p.daemon = True
569        p.start()
570
571        # check that all expected items are in the queue
572        for i in range(20):
573            self.assertEqual(queue.get(), i)
574        self.assertRaises(Queue.Empty, queue.get, False)
575
576        p.join()
577
578    def test_qsize(self):
579        q = self.Queue()
580        try:
581            self.assertEqual(q.qsize(), 0)
582        except NotImplementedError:
583            self.skipTest('qsize method not implemented')
584        q.put(1)
585        self.assertEqual(q.qsize(), 1)
586        q.put(5)
587        self.assertEqual(q.qsize(), 2)
588        q.get()
589        self.assertEqual(q.qsize(), 1)
590        q.get()
591        self.assertEqual(q.qsize(), 0)
592
593    @classmethod
594    def _test_task_done(cls, q):
595        for obj in iter(q.get, None):
596            time.sleep(DELTA)
597            q.task_done()
598
599    def test_task_done(self):
600        queue = self.JoinableQueue()
601
602        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
603            self.skipTest("requires 'queue.task_done()' method")
604
605        workers = [self.Process(target=self._test_task_done, args=(queue,))
606                   for i in xrange(4)]
607
608        for p in workers:
609            p.daemon = True
610            p.start()
611
612        for i in xrange(10):
613            queue.put(i)
614
615        queue.join()
616
617        for p in workers:
618            queue.put(None)
619
620        for p in workers:
621            p.join()
622
623    def test_no_import_lock_contention(self):
624        with test_support.temp_cwd():
625            module_name = 'imported_by_an_imported_module'
626            with open(module_name + '.py', 'w') as f:
627                f.write("""if 1:
628                    import multiprocessing
629
630                    q = multiprocessing.Queue()
631                    q.put('knock knock')
632                    q.get(timeout=3)
633                    q.close()
634                """)
635
636            with test_support.DirsOnSysPath(os.getcwd()):
637                try:
638                    __import__(module_name)
639                except Queue.Empty:
640                    self.fail("Probable regression on import lock contention;"
641                              " see Issue #22853")
642
643#
644#
645#
646
647class _TestLock(BaseTestCase):
648
649    def test_lock(self):
650        lock = self.Lock()
651        self.assertEqual(lock.acquire(), True)
652        self.assertEqual(lock.acquire(False), False)
653        self.assertEqual(lock.release(), None)
654        self.assertRaises((ValueError, threading.ThreadError), lock.release)
655
656    def test_rlock(self):
657        lock = self.RLock()
658        self.assertEqual(lock.acquire(), True)
659        self.assertEqual(lock.acquire(), True)
660        self.assertEqual(lock.acquire(), True)
661        self.assertEqual(lock.release(), None)
662        self.assertEqual(lock.release(), None)
663        self.assertEqual(lock.release(), None)
664        self.assertRaises((AssertionError, RuntimeError), lock.release)
665
666    def test_lock_context(self):
667        with self.Lock():
668            pass
669
670
671class _TestSemaphore(BaseTestCase):
672
673    def _test_semaphore(self, sem):
674        self.assertReturnsIfImplemented(2, get_value, sem)
675        self.assertEqual(sem.acquire(), True)
676        self.assertReturnsIfImplemented(1, get_value, sem)
677        self.assertEqual(sem.acquire(), True)
678        self.assertReturnsIfImplemented(0, get_value, sem)
679        self.assertEqual(sem.acquire(False), False)
680        self.assertReturnsIfImplemented(0, get_value, sem)
681        self.assertEqual(sem.release(), None)
682        self.assertReturnsIfImplemented(1, get_value, sem)
683        self.assertEqual(sem.release(), None)
684        self.assertReturnsIfImplemented(2, get_value, sem)
685
686    def test_semaphore(self):
687        sem = self.Semaphore(2)
688        self._test_semaphore(sem)
689        self.assertEqual(sem.release(), None)
690        self.assertReturnsIfImplemented(3, get_value, sem)
691        self.assertEqual(sem.release(), None)
692        self.assertReturnsIfImplemented(4, get_value, sem)
693
694    def test_bounded_semaphore(self):
695        sem = self.BoundedSemaphore(2)
696        self._test_semaphore(sem)
697        # Currently fails on OS/X
698        #if HAVE_GETVALUE:
699        #    self.assertRaises(ValueError, sem.release)
700        #    self.assertReturnsIfImplemented(2, get_value, sem)
701
702    def test_timeout(self):
703        if self.TYPE != 'processes':
704            self.skipTest('test not appropriate for {}'.format(self.TYPE))
705
706        sem = self.Semaphore(0)
707        acquire = TimingWrapper(sem.acquire)
708
709        self.assertEqual(acquire(False), False)
710        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
711
712        self.assertEqual(acquire(False, None), False)
713        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
714
715        self.assertEqual(acquire(False, TIMEOUT1), False)
716        self.assertTimingAlmostEqual(acquire.elapsed, 0)
717
718        self.assertEqual(acquire(True, TIMEOUT2), False)
719        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
720
721        self.assertEqual(acquire(timeout=TIMEOUT3), False)
722        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
723
724
725class _TestCondition(BaseTestCase):
726
727    @classmethod
728    def f(cls, cond, sleeping, woken, timeout=None):
729        cond.acquire()
730        sleeping.release()
731        cond.wait(timeout)
732        woken.release()
733        cond.release()
734
735    def check_invariant(self, cond):
736        # this is only supposed to succeed when there are no sleepers
737        if self.TYPE == 'processes':
738            try:
739                sleepers = (cond._sleeping_count.get_value() -
740                            cond._woken_count.get_value())
741                self.assertEqual(sleepers, 0)
742                self.assertEqual(cond._wait_semaphore.get_value(), 0)
743            except NotImplementedError:
744                pass
745
746    def test_notify(self):
747        cond = self.Condition()
748        sleeping = self.Semaphore(0)
749        woken = self.Semaphore(0)
750
751        p = self.Process(target=self.f, args=(cond, sleeping, woken))
752        p.daemon = True
753        p.start()
754
755        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
756        p.daemon = True
757        p.start()
758
759        # wait for both children to start sleeping
760        sleeping.acquire()
761        sleeping.acquire()
762
763        # check no process/thread has woken up
764        time.sleep(DELTA)
765        self.assertReturnsIfImplemented(0, get_value, woken)
766
767        # wake up one process/thread
768        cond.acquire()
769        cond.notify()
770        cond.release()
771
772        # check one process/thread has woken up
773        time.sleep(DELTA)
774        self.assertReturnsIfImplemented(1, get_value, woken)
775
776        # wake up another
777        cond.acquire()
778        cond.notify()
779        cond.release()
780
781        # check other has woken up
782        time.sleep(DELTA)
783        self.assertReturnsIfImplemented(2, get_value, woken)
784
785        # check state is not mucked up
786        self.check_invariant(cond)
787        p.join()
788
789    def test_notify_all(self):
790        cond = self.Condition()
791        sleeping = self.Semaphore(0)
792        woken = self.Semaphore(0)
793
794        # start some threads/processes which will timeout
795        for i in range(3):
796            p = self.Process(target=self.f,
797                             args=(cond, sleeping, woken, TIMEOUT1))
798            p.daemon = True
799            p.start()
800
801            t = threading.Thread(target=self.f,
802                                 args=(cond, sleeping, woken, TIMEOUT1))
803            t.daemon = True
804            t.start()
805
806        # wait for them all to sleep
807        for i in xrange(6):
808            sleeping.acquire()
809
810        # check they have all timed out
811        for i in xrange(6):
812            woken.acquire()
813        self.assertReturnsIfImplemented(0, get_value, woken)
814
815        # check state is not mucked up
816        self.check_invariant(cond)
817
818        # start some more threads/processes
819        for i in range(3):
820            p = self.Process(target=self.f, args=(cond, sleeping, woken))
821            p.daemon = True
822            p.start()
823
824            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
825            t.daemon = True
826            t.start()
827
828        # wait for them to all sleep
829        for i in xrange(6):
830            sleeping.acquire()
831
832        # check no process/thread has woken up
833        time.sleep(DELTA)
834        self.assertReturnsIfImplemented(0, get_value, woken)
835
836        # wake them all up
837        cond.acquire()
838        cond.notify_all()
839        cond.release()
840
841        # check they have all woken
842        time.sleep(DELTA)
843        self.assertReturnsIfImplemented(6, get_value, woken)
844
845        # check state is not mucked up
846        self.check_invariant(cond)
847
848    def test_timeout(self):
849        cond = self.Condition()
850        wait = TimingWrapper(cond.wait)
851        cond.acquire()
852        res = wait(TIMEOUT1)
853        cond.release()
854        self.assertEqual(res, None)
855        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
856
857
858class _TestEvent(BaseTestCase):
859
860    @classmethod
861    def _test_event(cls, event):
862        time.sleep(TIMEOUT2)
863        event.set()
864
865    def test_event(self):
866        event = self.Event()
867        wait = TimingWrapper(event.wait)
868
869        # Removed temporarily, due to API shear, this does not
870        # work with threading._Event objects. is_set == isSet
871        self.assertEqual(event.is_set(), False)
872
873        # Removed, threading.Event.wait() will return the value of the __flag
874        # instead of None. API Shear with the semaphore backed mp.Event
875        self.assertEqual(wait(0.0), False)
876        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
877        self.assertEqual(wait(TIMEOUT1), False)
878        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
879
880        event.set()
881
882        # See note above on the API differences
883        self.assertEqual(event.is_set(), True)
884        self.assertEqual(wait(), True)
885        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
886        self.assertEqual(wait(TIMEOUT1), True)
887        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
888        # self.assertEqual(event.is_set(), True)
889
890        event.clear()
891
892        #self.assertEqual(event.is_set(), False)
893
894        p = self.Process(target=self._test_event, args=(event,))
895        p.daemon = True
896        p.start()
897        self.assertEqual(wait(), True)
898
899#
900#
901#
902
903class _TestValue(BaseTestCase):
904
905    ALLOWED_TYPES = ('processes',)
906
907    codes_values = [
908        ('i', 4343, 24234),
909        ('d', 3.625, -4.25),
910        ('h', -232, 234),
911        ('c', latin('x'), latin('y'))
912        ]
913
914    def setUp(self):
915        if not HAS_SHAREDCTYPES:
916            self.skipTest("requires multiprocessing.sharedctypes")
917
918    @classmethod
919    def _test(cls, values):
920        for sv, cv in zip(values, cls.codes_values):
921            sv.value = cv[2]
922
923
924    def test_value(self, raw=False):
925        if raw:
926            values = [self.RawValue(code, value)
927                      for code, value, _ in self.codes_values]
928        else:
929            values = [self.Value(code, value)
930                      for code, value, _ in self.codes_values]
931
932        for sv, cv in zip(values, self.codes_values):
933            self.assertEqual(sv.value, cv[1])
934
935        proc = self.Process(target=self._test, args=(values,))
936        proc.daemon = True
937        proc.start()
938        proc.join()
939
940        for sv, cv in zip(values, self.codes_values):
941            self.assertEqual(sv.value, cv[2])
942
943    def test_rawvalue(self):
944        self.test_value(raw=True)
945
946    def test_getobj_getlock(self):
947        val1 = self.Value('i', 5)
948        lock1 = val1.get_lock()
949        obj1 = val1.get_obj()
950
951        val2 = self.Value('i', 5, lock=None)
952        lock2 = val2.get_lock()
953        obj2 = val2.get_obj()
954
955        lock = self.Lock()
956        val3 = self.Value('i', 5, lock=lock)
957        lock3 = val3.get_lock()
958        obj3 = val3.get_obj()
959        self.assertEqual(lock, lock3)
960
961        arr4 = self.Value('i', 5, lock=False)
962        self.assertFalse(hasattr(arr4, 'get_lock'))
963        self.assertFalse(hasattr(arr4, 'get_obj'))
964
965        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
966
967        arr5 = self.RawValue('i', 5)
968        self.assertFalse(hasattr(arr5, 'get_lock'))
969        self.assertFalse(hasattr(arr5, 'get_obj'))
970
971
972class _TestArray(BaseTestCase):
973
974    ALLOWED_TYPES = ('processes',)
975
976    @classmethod
977    def f(cls, seq):
978        for i in range(1, len(seq)):
979            seq[i] += seq[i-1]
980
981    @unittest.skipIf(c_int is None, "requires _ctypes")
982    def test_array(self, raw=False):
983        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
984        if raw:
985            arr = self.RawArray('i', seq)
986        else:
987            arr = self.Array('i', seq)
988
989        self.assertEqual(len(arr), len(seq))
990        self.assertEqual(arr[3], seq[3])
991        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
992
993        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
994
995        self.assertEqual(list(arr[:]), seq)
996
997        self.f(seq)
998
999        p = self.Process(target=self.f, args=(arr,))
1000        p.daemon = True
1001        p.start()
1002        p.join()
1003
1004        self.assertEqual(list(arr[:]), seq)
1005
1006    @unittest.skipIf(c_int is None, "requires _ctypes")
1007    def test_array_from_size(self):
1008        size = 10
1009        # Test for zeroing (see issue #11675).
1010        # The repetition below strengthens the test by increasing the chances
1011        # of previously allocated non-zero memory being used for the new array
1012        # on the 2nd and 3rd loops.
1013        for _ in range(3):
1014            arr = self.Array('i', size)
1015            self.assertEqual(len(arr), size)
1016            self.assertEqual(list(arr), [0] * size)
1017            arr[:] = range(10)
1018            self.assertEqual(list(arr), range(10))
1019            del arr
1020
1021    @unittest.skipIf(c_int is None, "requires _ctypes")
1022    def test_rawarray(self):
1023        self.test_array(raw=True)
1024
1025    @unittest.skipIf(c_int is None, "requires _ctypes")
1026    def test_array_accepts_long(self):
1027        arr = self.Array('i', 10L)
1028        self.assertEqual(len(arr), 10)
1029        raw_arr = self.RawArray('i', 10L)
1030        self.assertEqual(len(raw_arr), 10)
1031
1032    @unittest.skipIf(c_int is None, "requires _ctypes")
1033    def test_getobj_getlock_obj(self):
1034        arr1 = self.Array('i', range(10))
1035        lock1 = arr1.get_lock()
1036        obj1 = arr1.get_obj()
1037
1038        arr2 = self.Array('i', range(10), lock=None)
1039        lock2 = arr2.get_lock()
1040        obj2 = arr2.get_obj()
1041
1042        lock = self.Lock()
1043        arr3 = self.Array('i', range(10), lock=lock)
1044        lock3 = arr3.get_lock()
1045        obj3 = arr3.get_obj()
1046        self.assertEqual(lock, lock3)
1047
1048        arr4 = self.Array('i', range(10), lock=False)
1049        self.assertFalse(hasattr(arr4, 'get_lock'))
1050        self.assertFalse(hasattr(arr4, 'get_obj'))
1051        self.assertRaises(AttributeError,
1052                          self.Array, 'i', range(10), lock='notalock')
1053
1054        arr5 = self.RawArray('i', range(10))
1055        self.assertFalse(hasattr(arr5, 'get_lock'))
1056        self.assertFalse(hasattr(arr5, 'get_obj'))
1057
1058#
1059#
1060#
1061
1062class _TestContainers(BaseTestCase):
1063
1064    ALLOWED_TYPES = ('manager',)
1065
1066    def test_list(self):
1067        a = self.list(range(10))
1068        self.assertEqual(a[:], range(10))
1069
1070        b = self.list()
1071        self.assertEqual(b[:], [])
1072
1073        b.extend(range(5))
1074        self.assertEqual(b[:], range(5))
1075
1076        self.assertEqual(b[2], 2)
1077        self.assertEqual(b[2:10], [2,3,4])
1078
1079        b *= 2
1080        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1081
1082        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1083
1084        self.assertEqual(a[:], range(10))
1085
1086        d = [a, b]
1087        e = self.list(d)
1088        self.assertEqual(
1089            e[:],
1090            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1091            )
1092
1093        f = self.list([a])
1094        a.append('hello')
1095        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1096
1097    def test_dict(self):
1098        d = self.dict()
1099        indices = range(65, 70)
1100        for i in indices:
1101            d[i] = chr(i)
1102        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1103        self.assertEqual(sorted(d.keys()), indices)
1104        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1105        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1106
1107    def test_namespace(self):
1108        n = self.Namespace()
1109        n.name = 'Bob'
1110        n.job = 'Builder'
1111        n._hidden = 'hidden'
1112        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1113        del n.job
1114        self.assertEqual(str(n), "Namespace(name='Bob')")
1115        self.assertTrue(hasattr(n, 'name'))
1116        self.assertTrue(not hasattr(n, 'job'))
1117
1118#
1119#
1120#
1121
1122def sqr(x, wait=0.0):
1123    time.sleep(wait)
1124    return x*x
1125
1126class SayWhenError(ValueError): pass
1127
1128def exception_throwing_generator(total, when):
1129    for i in range(total):
1130        if i == when:
1131            raise SayWhenError("Somebody said when")
1132        yield i
1133
1134class _TestPool(BaseTestCase):
1135
1136    def test_apply(self):
1137        papply = self.pool.apply
1138        self.assertEqual(papply(sqr, (5,)), sqr(5))
1139        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1140
1141    def test_map(self):
1142        pmap = self.pool.map
1143        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1144        self.assertEqual(pmap(sqr, range(100), chunksize=20),
1145                         map(sqr, range(100)))
1146
1147    def test_map_unplicklable(self):
1148        # Issue #19425 -- failure to pickle should not cause a hang
1149        if self.TYPE == 'threads':
1150            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1151        class A(object):
1152            def __reduce__(self):
1153                raise RuntimeError('cannot pickle')
1154        with self.assertRaises(RuntimeError):
1155            self.pool.map(sqr, [A()]*10)
1156
1157    def test_map_chunksize(self):
1158        try:
1159            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1160        except multiprocessing.TimeoutError:
1161            self.fail("pool.map_async with chunksize stalled on null list")
1162
1163    def test_async(self):
1164        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1165        get = TimingWrapper(res.get)
1166        self.assertEqual(get(), 49)
1167        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1168
1169    def test_async_timeout(self):
1170        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
1171        get = TimingWrapper(res.get)
1172        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1173        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1174
1175    def test_imap(self):
1176        it = self.pool.imap(sqr, range(10))
1177        self.assertEqual(list(it), map(sqr, range(10)))
1178
1179        it = self.pool.imap(sqr, range(10))
1180        for i in range(10):
1181            self.assertEqual(it.next(), i*i)
1182        self.assertRaises(StopIteration, it.next)
1183
1184        it = self.pool.imap(sqr, range(1000), chunksize=100)
1185        for i in range(1000):
1186            self.assertEqual(it.next(), i*i)
1187        self.assertRaises(StopIteration, it.next)
1188
1189    def test_imap_handle_iterable_exception(self):
1190        if self.TYPE == 'manager':
1191            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1192
1193        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1194        for i in range(3):
1195            self.assertEqual(next(it), i*i)
1196        self.assertRaises(SayWhenError, it.next)
1197
1198        # SayWhenError seen at start of problematic chunk's results
1199        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1200        for i in range(6):
1201            self.assertEqual(next(it), i*i)
1202        self.assertRaises(SayWhenError, it.next)
1203        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1204        for i in range(4):
1205            self.assertEqual(next(it), i*i)
1206        self.assertRaises(SayWhenError, it.next)
1207
1208    def test_imap_unordered(self):
1209        it = self.pool.imap_unordered(sqr, range(1000))
1210        self.assertEqual(sorted(it), map(sqr, range(1000)))
1211
1212        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1213        self.assertEqual(sorted(it), map(sqr, range(1000)))
1214
1215    def test_imap_unordered_handle_iterable_exception(self):
1216        if self.TYPE == 'manager':
1217            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1218
1219        it = self.pool.imap_unordered(sqr,
1220                                      exception_throwing_generator(10, 3),
1221                                      1)
1222        expected_values = map(sqr, range(10))
1223        with self.assertRaises(SayWhenError):
1224            # imap_unordered makes it difficult to anticipate the SayWhenError
1225            for i in range(10):
1226                value = next(it)
1227                self.assertIn(value, expected_values)
1228                expected_values.remove(value)
1229
1230        it = self.pool.imap_unordered(sqr,
1231                                      exception_throwing_generator(20, 7),
1232                                      2)
1233        expected_values = map(sqr, range(20))
1234        with self.assertRaises(SayWhenError):
1235            for i in range(20):
1236                value = next(it)
1237                self.assertIn(value, expected_values)
1238                expected_values.remove(value)
1239
1240    def test_make_pool(self):
1241        self.assertRaises(ValueError, multiprocessing.Pool, -1)
1242        self.assertRaises(ValueError, multiprocessing.Pool, 0)
1243
1244        p = multiprocessing.Pool(3)
1245        self.assertEqual(3, len(p._pool))
1246        p.close()
1247        p.join()
1248
1249    def test_terminate(self):
1250        p = self.Pool(4)
1251        result = p.map_async(
1252            time.sleep, [0.1 for i in range(10000)], chunksize=1
1253            )
1254        p.terminate()
1255        join = TimingWrapper(p.join)
1256        join()
1257        self.assertTrue(join.elapsed < 0.2)
1258
1259    def test_empty_iterable(self):
1260        # See Issue 12157
1261        p = self.Pool(1)
1262
1263        self.assertEqual(p.map(sqr, []), [])
1264        self.assertEqual(list(p.imap(sqr, [])), [])
1265        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1266        self.assertEqual(p.map_async(sqr, []).get(), [])
1267
1268        p.close()
1269        p.join()
1270
1271def unpickleable_result():
1272    return lambda: 42
1273
1274class _TestPoolWorkerErrors(BaseTestCase):
1275    ALLOWED_TYPES = ('processes', )
1276
1277    def test_unpickleable_result(self):
1278        from multiprocessing.pool import MaybeEncodingError
1279        p = multiprocessing.Pool(2)
1280
1281        # Make sure we don't lose pool processes because of encoding errors.
1282        for iteration in range(20):
1283            res = p.apply_async(unpickleable_result)
1284            self.assertRaises(MaybeEncodingError, res.get)
1285
1286        p.close()
1287        p.join()
1288
1289class _TestPoolWorkerLifetime(BaseTestCase):
1290
1291    ALLOWED_TYPES = ('processes', )
1292    def test_pool_worker_lifetime(self):
1293        p = multiprocessing.Pool(3, maxtasksperchild=10)
1294        self.assertEqual(3, len(p._pool))
1295        origworkerpids = [w.pid for w in p._pool]
1296        # Run many tasks so each worker gets replaced (hopefully)
1297        results = []
1298        for i in range(100):
1299            results.append(p.apply_async(sqr, (i, )))
1300        # Fetch the results and verify we got the right answers,
1301        # also ensuring all the tasks have completed.
1302        for (j, res) in enumerate(results):
1303            self.assertEqual(res.get(), sqr(j))
1304        # Refill the pool
1305        p._repopulate_pool()
1306        # Wait until all workers are alive
1307        # (countdown * DELTA = 5 seconds max startup process time)
1308        countdown = 50
1309        while countdown and not all(w.is_alive() for w in p._pool):
1310            countdown -= 1
1311            time.sleep(DELTA)
1312        finalworkerpids = [w.pid for w in p._pool]
1313        # All pids should be assigned.  See issue #7805.
1314        self.assertNotIn(None, origworkerpids)
1315        self.assertNotIn(None, finalworkerpids)
1316        # Finally, check that the worker pids have changed
1317        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1318        p.close()
1319        p.join()
1320
1321    def test_pool_worker_lifetime_early_close(self):
1322        # Issue #10332: closing a pool whose workers have limited lifetimes
1323        # before all the tasks completed would make join() hang.
1324        p = multiprocessing.Pool(3, maxtasksperchild=1)
1325        results = []
1326        for i in range(6):
1327            results.append(p.apply_async(sqr, (i, 0.3)))
1328        p.close()
1329        p.join()
1330        # check the results
1331        for (j, res) in enumerate(results):
1332            self.assertEqual(res.get(), sqr(j))
1333
1334
1335#
1336# Test that manager has expected number of shared objects left
1337#
1338
1339class _TestZZZNumberOfObjects(BaseTestCase):
1340    # Because test cases are sorted alphabetically, this one will get
1341    # run after all the other tests for the manager.  It tests that
1342    # there have been no "reference leaks" for the manager's shared
1343    # objects.  Note the comment in _TestPool.test_terminate().
1344    ALLOWED_TYPES = ('manager',)
1345
1346    def test_number_of_objects(self):
1347        EXPECTED_NUMBER = 1                # the pool object is still alive
1348        multiprocessing.active_children()  # discard dead process objs
1349        gc.collect()                       # do garbage collection
1350        refs = self.manager._number_of_objects()
1351        debug_info = self.manager._debug_info()
1352        if refs != EXPECTED_NUMBER:
1353            print self.manager._debug_info()
1354            print debug_info
1355
1356        self.assertEqual(refs, EXPECTED_NUMBER)
1357
1358#
1359# Test of creating a customized manager class
1360#
1361
1362from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1363
1364class FooBar(object):
1365    def f(self):
1366        return 'f()'
1367    def g(self):
1368        raise ValueError
1369    def _h(self):
1370        return '_h()'
1371
1372def baz():
1373    for i in xrange(10):
1374        yield i*i
1375
1376class IteratorProxy(BaseProxy):
1377    _exposed_ = ('next', '__next__')
1378    def __iter__(self):
1379        return self
1380    def next(self):
1381        return self._callmethod('next')
1382    def __next__(self):
1383        return self._callmethod('__next__')
1384
1385class MyManager(BaseManager):
1386    pass
1387
1388MyManager.register('Foo', callable=FooBar)
1389MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1390MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1391
1392
1393class _TestMyManager(BaseTestCase):
1394
1395    ALLOWED_TYPES = ('manager',)
1396
1397    def test_mymanager(self):
1398        manager = MyManager()
1399        manager.start()
1400
1401        foo = manager.Foo()
1402        bar = manager.Bar()
1403        baz = manager.baz()
1404
1405        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1406        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1407
1408        self.assertEqual(foo_methods, ['f', 'g'])
1409        self.assertEqual(bar_methods, ['f', '_h'])
1410
1411        self.assertEqual(foo.f(), 'f()')
1412        self.assertRaises(ValueError, foo.g)
1413        self.assertEqual(foo._callmethod('f'), 'f()')
1414        self.assertRaises(RemoteError, foo._callmethod, '_h')
1415
1416        self.assertEqual(bar.f(), 'f()')
1417        self.assertEqual(bar._h(), '_h()')
1418        self.assertEqual(bar._callmethod('f'), 'f()')
1419        self.assertEqual(bar._callmethod('_h'), '_h()')
1420
1421        self.assertEqual(list(baz), [i*i for i in range(10)])
1422
1423        manager.shutdown()
1424
1425#
1426# Test of connecting to a remote server and using xmlrpclib for serialization
1427#
1428
1429_queue = Queue.Queue()
1430def get_queue():
1431    return _queue
1432
1433class QueueManager(BaseManager):
1434    '''manager class used by server process'''
1435QueueManager.register('get_queue', callable=get_queue)
1436
1437class QueueManager2(BaseManager):
1438    '''manager class which specifies the same interface as QueueManager'''
1439QueueManager2.register('get_queue')
1440
1441
1442SERIALIZER = 'xmlrpclib'
1443
1444class _TestRemoteManager(BaseTestCase):
1445
1446    ALLOWED_TYPES = ('manager',)
1447    values = ['hello world', None, True, 2.25,
1448              #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1449              ]
1450    result = values[:]
1451    if test_support.have_unicode:
1452        #result[-1] = u'hall\xe5 v\xe4rlden'
1453        uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1454                                r'\u0441\u0432\u0456\u0442')
1455        values.append(uvalue)
1456        result.append(uvalue)
1457
1458    @classmethod
1459    def _putter(cls, address, authkey):
1460        manager = QueueManager2(
1461            address=address, authkey=authkey, serializer=SERIALIZER
1462            )
1463        manager.connect()
1464        queue = manager.get_queue()
1465        # Note that xmlrpclib will deserialize object as a list not a tuple
1466        queue.put(tuple(cls.values))
1467
1468    def test_remote(self):
1469        authkey = os.urandom(32)
1470
1471        manager = QueueManager(
1472            address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
1473            )
1474        manager.start()
1475
1476        p = self.Process(target=self._putter, args=(manager.address, authkey))
1477        p.daemon = True
1478        p.start()
1479
1480        manager2 = QueueManager2(
1481            address=manager.address, authkey=authkey, serializer=SERIALIZER
1482            )
1483        manager2.connect()
1484        queue = manager2.get_queue()
1485
1486        self.assertEqual(queue.get(), self.result)
1487
1488        # Because we are using xmlrpclib for serialization instead of
1489        # pickle this will cause a serialization error.
1490        self.assertRaises(Exception, queue.put, time.sleep)
1491
1492        # Make queue finalizer run before the server is stopped
1493        del queue
1494        manager.shutdown()
1495
1496class _TestManagerRestart(BaseTestCase):
1497
1498    @classmethod
1499    def _putter(cls, address, authkey):
1500        manager = QueueManager(
1501            address=address, authkey=authkey, serializer=SERIALIZER)
1502        manager.connect()
1503        queue = manager.get_queue()
1504        queue.put('hello world')
1505
1506    def test_rapid_restart(self):
1507        authkey = os.urandom(32)
1508        manager = QueueManager(
1509            address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
1510        srvr = manager.get_server()
1511        addr = srvr.address
1512        # Close the connection.Listener socket which gets opened as a part
1513        # of manager.get_server(). It's not needed for the test.
1514        srvr.listener.close()
1515        manager.start()
1516
1517        p = self.Process(target=self._putter, args=(manager.address, authkey))
1518        p.daemon = True
1519        p.start()
1520        queue = manager.get_queue()
1521        self.assertEqual(queue.get(), 'hello world')
1522        del queue
1523        manager.shutdown()
1524        manager = QueueManager(
1525            address=addr, authkey=authkey, serializer=SERIALIZER)
1526        manager.start()
1527        manager.shutdown()
1528
1529#
1530#
1531#
1532
1533SENTINEL = latin('')
1534
1535class _TestConnection(BaseTestCase):
1536
1537    ALLOWED_TYPES = ('processes', 'threads')
1538
1539    @classmethod
1540    def _echo(cls, conn):
1541        for msg in iter(conn.recv_bytes, SENTINEL):
1542            conn.send_bytes(msg)
1543        conn.close()
1544
1545    def test_connection(self):
1546        conn, child_conn = self.Pipe()
1547
1548        p = self.Process(target=self._echo, args=(child_conn,))
1549        p.daemon = True
1550        p.start()
1551
1552        seq = [1, 2.25, None]
1553        msg = latin('hello world')
1554        longmsg = msg * 10
1555        arr = array.array('i', range(4))
1556
1557        if self.TYPE == 'processes':
1558            self.assertEqual(type(conn.fileno()), int)
1559
1560        self.assertEqual(conn.send(seq), None)
1561        self.assertEqual(conn.recv(), seq)
1562
1563        self.assertEqual(conn.send_bytes(msg), None)
1564        self.assertEqual(conn.recv_bytes(), msg)
1565
1566        if self.TYPE == 'processes':
1567            buffer = array.array('i', [0]*10)
1568            expected = list(arr) + [0] * (10 - len(arr))
1569            self.assertEqual(conn.send_bytes(arr), None)
1570            self.assertEqual(conn.recv_bytes_into(buffer),
1571                             len(arr) * buffer.itemsize)
1572            self.assertEqual(list(buffer), expected)
1573
1574            buffer = array.array('i', [0]*10)
1575            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1576            self.assertEqual(conn.send_bytes(arr), None)
1577            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1578                             len(arr) * buffer.itemsize)
1579            self.assertEqual(list(buffer), expected)
1580
1581            buffer = bytearray(latin(' ' * 40))
1582            self.assertEqual(conn.send_bytes(longmsg), None)
1583            try:
1584                res = conn.recv_bytes_into(buffer)
1585            except multiprocessing.BufferTooShort, e:
1586                self.assertEqual(e.args, (longmsg,))
1587            else:
1588                self.fail('expected BufferTooShort, got %s' % res)
1589
1590        poll = TimingWrapper(conn.poll)
1591
1592        self.assertEqual(poll(), False)
1593        self.assertTimingAlmostEqual(poll.elapsed, 0)
1594
1595        self.assertEqual(poll(TIMEOUT1), False)
1596        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1597
1598        conn.send(None)
1599        time.sleep(.1)
1600
1601        self.assertEqual(poll(TIMEOUT1), True)
1602        self.assertTimingAlmostEqual(poll.elapsed, 0)
1603
1604        self.assertEqual(conn.recv(), None)
1605
1606        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
1607        conn.send_bytes(really_big_msg)
1608        self.assertEqual(conn.recv_bytes(), really_big_msg)
1609
1610        conn.send_bytes(SENTINEL)                          # tell child to quit
1611        child_conn.close()
1612
1613        if self.TYPE == 'processes':
1614            self.assertEqual(conn.readable, True)
1615            self.assertEqual(conn.writable, True)
1616            self.assertRaises(EOFError, conn.recv)
1617            self.assertRaises(EOFError, conn.recv_bytes)
1618
1619        p.join()
1620
1621    def test_duplex_false(self):
1622        reader, writer = self.Pipe(duplex=False)
1623        self.assertEqual(writer.send(1), None)
1624        self.assertEqual(reader.recv(), 1)
1625        if self.TYPE == 'processes':
1626            self.assertEqual(reader.readable, True)
1627            self.assertEqual(reader.writable, False)
1628            self.assertEqual(writer.readable, False)
1629            self.assertEqual(writer.writable, True)
1630            self.assertRaises(IOError, reader.send, 2)
1631            self.assertRaises(IOError, writer.recv)
1632            self.assertRaises(IOError, writer.poll)
1633
1634    def test_spawn_close(self):
1635        # We test that a pipe connection can be closed by parent
1636        # process immediately after child is spawned.  On Windows this
1637        # would have sometimes failed on old versions because
1638        # child_conn would be closed before the child got a chance to
1639        # duplicate it.
1640        conn, child_conn = self.Pipe()
1641
1642        p = self.Process(target=self._echo, args=(child_conn,))
1643        p.daemon = True
1644        p.start()
1645        child_conn.close()    # this might complete before child initializes
1646
1647        msg = latin('hello')
1648        conn.send_bytes(msg)
1649        self.assertEqual(conn.recv_bytes(), msg)
1650
1651        conn.send_bytes(SENTINEL)
1652        conn.close()
1653        p.join()
1654
1655    def test_sendbytes(self):
1656        if self.TYPE != 'processes':
1657            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1658
1659        msg = latin('abcdefghijklmnopqrstuvwxyz')
1660        a, b = self.Pipe()
1661
1662        a.send_bytes(msg)
1663        self.assertEqual(b.recv_bytes(), msg)
1664
1665        a.send_bytes(msg, 5)
1666        self.assertEqual(b.recv_bytes(), msg[5:])
1667
1668        a.send_bytes(msg, 7, 8)
1669        self.assertEqual(b.recv_bytes(), msg[7:7+8])
1670
1671        a.send_bytes(msg, 26)
1672        self.assertEqual(b.recv_bytes(), latin(''))
1673
1674        a.send_bytes(msg, 26, 0)
1675        self.assertEqual(b.recv_bytes(), latin(''))
1676
1677        self.assertRaises(ValueError, a.send_bytes, msg, 27)
1678
1679        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1680
1681        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1682
1683        self.assertRaises(ValueError, a.send_bytes, msg, -1)
1684
1685        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1686
1687    @classmethod
1688    def _is_fd_assigned(cls, fd):
1689        try:
1690            os.fstat(fd)
1691        except OSError as e:
1692            if e.errno == errno.EBADF:
1693                return False
1694            raise
1695        else:
1696            return True
1697
1698    @classmethod
1699    def _writefd(cls, conn, data, create_dummy_fds=False):
1700        if create_dummy_fds:
1701            for i in range(0, 256):
1702                if not cls._is_fd_assigned(i):
1703                    os.dup2(conn.fileno(), i)
1704        fd = reduction.recv_handle(conn)
1705        if msvcrt:
1706            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1707        os.write(fd, data)
1708        os.close(fd)
1709
1710    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1711    def test_fd_transfer(self):
1712        if self.TYPE != 'processes':
1713            self.skipTest("only makes sense with processes")
1714        conn, child_conn = self.Pipe(duplex=True)
1715
1716        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
1717        p.daemon = True
1718        p.start()
1719        with open(test_support.TESTFN, "wb") as f:
1720            fd = f.fileno()
1721            if msvcrt:
1722                fd = msvcrt.get_osfhandle(fd)
1723            reduction.send_handle(conn, fd, p.pid)
1724        p.join()
1725        with open(test_support.TESTFN, "rb") as f:
1726            self.assertEqual(f.read(), b"foo")
1727
1728    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1729    @unittest.skipIf(sys.platform == "win32",
1730                     "test semantics don't make sense on Windows")
1731    @unittest.skipIf(MAXFD <= 256,
1732                     "largest assignable fd number is too small")
1733    @unittest.skipUnless(hasattr(os, "dup2"),
1734                         "test needs os.dup2()")
1735    def test_large_fd_transfer(self):
1736        # With fd > 256 (issue #11657)
1737        if self.TYPE != 'processes':
1738            self.skipTest("only makes sense with processes")
1739        conn, child_conn = self.Pipe(duplex=True)
1740
1741        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
1742        p.daemon = True
1743        p.start()
1744        with open(test_support.TESTFN, "wb") as f:
1745            fd = f.fileno()
1746            for newfd in range(256, MAXFD):
1747                if not self._is_fd_assigned(newfd):
1748                    break
1749            else:
1750                self.fail("could not find an unassigned large file descriptor")
1751            os.dup2(fd, newfd)
1752            try:
1753                reduction.send_handle(conn, newfd, p.pid)
1754            finally:
1755                os.close(newfd)
1756        p.join()
1757        with open(test_support.TESTFN, "rb") as f:
1758            self.assertEqual(f.read(), b"bar")
1759
1760    @classmethod
1761    def _send_data_without_fd(self, conn):
1762        os.write(conn.fileno(), b"\0")
1763
1764    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1765    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1766    def test_missing_fd_transfer(self):
1767        # Check that exception is raised when received data is not
1768        # accompanied by a file descriptor in ancillary data.
1769        if self.TYPE != 'processes':
1770            self.skipTest("only makes sense with processes")
1771        conn, child_conn = self.Pipe(duplex=True)
1772
1773        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1774        p.daemon = True
1775        p.start()
1776        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1777        p.join()
1778
1779class _TestListenerClient(BaseTestCase):
1780
1781    ALLOWED_TYPES = ('processes', 'threads')
1782
1783    @classmethod
1784    def _test(cls, address):
1785        conn = cls.connection.Client(address)
1786        conn.send('hello')
1787        conn.close()
1788
1789    def test_listener_client(self):
1790        for family in self.connection.families:
1791            l = self.connection.Listener(family=family)
1792            p = self.Process(target=self._test, args=(l.address,))
1793            p.daemon = True
1794            p.start()
1795            conn = l.accept()
1796            self.assertEqual(conn.recv(), 'hello')
1797            p.join()
1798            l.close()
1799
1800    def test_issue14725(self):
1801        l = self.connection.Listener()
1802        p = self.Process(target=self._test, args=(l.address,))
1803        p.daemon = True
1804        p.start()
1805        time.sleep(1)
1806        # On Windows the client process should by now have connected,
1807        # written data and closed the pipe handle by now.  This causes
1808        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
1809        # 14725.
1810        conn = l.accept()
1811        self.assertEqual(conn.recv(), 'hello')
1812        conn.close()
1813        p.join()
1814        l.close()
1815
1816#
1817# Test of sending connection and socket objects between processes
1818#
1819"""
1820class _TestPicklingConnections(BaseTestCase):
1821
1822    ALLOWED_TYPES = ('processes',)
1823
1824    def _listener(self, conn, families):
1825        for fam in families:
1826            l = self.connection.Listener(family=fam)
1827            conn.send(l.address)
1828            new_conn = l.accept()
1829            conn.send(new_conn)
1830
1831        if self.TYPE == 'processes':
1832            l = socket.socket()
1833            l.bind(('localhost', 0))
1834            conn.send(l.getsockname())
1835            l.listen(1)
1836            new_conn, addr = l.accept()
1837            conn.send(new_conn)
1838
1839        conn.recv()
1840
1841    def _remote(self, conn):
1842        for (address, msg) in iter(conn.recv, None):
1843            client = self.connection.Client(address)
1844            client.send(msg.upper())
1845            client.close()
1846
1847        if self.TYPE == 'processes':
1848            address, msg = conn.recv()
1849            client = socket.socket()
1850            client.connect(address)
1851            client.sendall(msg.upper())
1852            client.close()
1853
1854        conn.close()
1855
1856    def test_pickling(self):
1857        try:
1858            multiprocessing.allow_connection_pickling()
1859        except ImportError:
1860            return
1861
1862        families = self.connection.families
1863
1864        lconn, lconn0 = self.Pipe()
1865        lp = self.Process(target=self._listener, args=(lconn0, families))
1866        lp.daemon = True
1867        lp.start()
1868        lconn0.close()
1869
1870        rconn, rconn0 = self.Pipe()
1871        rp = self.Process(target=self._remote, args=(rconn0,))
1872        rp.daemon = True
1873        rp.start()
1874        rconn0.close()
1875
1876        for fam in families:
1877            msg = ('This connection uses family %s' % fam).encode('ascii')
1878            address = lconn.recv()
1879            rconn.send((address, msg))
1880            new_conn = lconn.recv()
1881            self.assertEqual(new_conn.recv(), msg.upper())
1882
1883        rconn.send(None)
1884
1885        if self.TYPE == 'processes':
1886            msg = latin('This connection uses a normal socket')
1887            address = lconn.recv()
1888            rconn.send((address, msg))
1889            if hasattr(socket, 'fromfd'):
1890                new_conn = lconn.recv()
1891                self.assertEqual(new_conn.recv(100), msg.upper())
1892            else:
1893                # XXX On Windows with Py2.6 need to backport fromfd()
1894                discard = lconn.recv_bytes()
1895
1896        lconn.send(None)
1897
1898        rconn.close()
1899        lconn.close()
1900
1901        lp.join()
1902        rp.join()
1903"""
1904#
1905#
1906#
1907
1908class _TestHeap(BaseTestCase):
1909
1910    ALLOWED_TYPES = ('processes',)
1911
1912    def test_heap(self):
1913        iterations = 5000
1914        maxblocks = 50
1915        blocks = []
1916
1917        # create and destroy lots of blocks of different sizes
1918        for i in xrange(iterations):
1919            size = int(random.lognormvariate(0, 1) * 1000)
1920            b = multiprocessing.heap.BufferWrapper(size)
1921            blocks.append(b)
1922            if len(blocks) > maxblocks:
1923                i = random.randrange(maxblocks)
1924                del blocks[i]
1925
1926        # get the heap object
1927        heap = multiprocessing.heap.BufferWrapper._heap
1928
1929        # verify the state of the heap
1930        all = []
1931        occupied = 0
1932        heap._lock.acquire()
1933        self.addCleanup(heap._lock.release)
1934        for L in heap._len_to_seq.values():
1935            for arena, start, stop in L:
1936                all.append((heap._arenas.index(arena), start, stop,
1937                            stop-start, 'free'))
1938        for arena, start, stop in heap._allocated_blocks:
1939            all.append((heap._arenas.index(arena), start, stop,
1940                        stop-start, 'occupied'))
1941            occupied += (stop-start)
1942
1943        all.sort()
1944
1945        for i in range(len(all)-1):
1946            (arena, start, stop) = all[i][:3]
1947            (narena, nstart, nstop) = all[i+1][:3]
1948            self.assertTrue((arena != narena and nstart == 0) or
1949                            (stop == nstart))
1950
1951    def test_free_from_gc(self):
1952        # Check that freeing of blocks by the garbage collector doesn't deadlock
1953        # (issue #12352).
1954        # Make sure the GC is enabled, and set lower collection thresholds to
1955        # make collections more frequent (and increase the probability of
1956        # deadlock).
1957        if not gc.isenabled():
1958            gc.enable()
1959            self.addCleanup(gc.disable)
1960        thresholds = gc.get_threshold()
1961        self.addCleanup(gc.set_threshold, *thresholds)
1962        gc.set_threshold(10)
1963
1964        # perform numerous block allocations, with cyclic references to make
1965        # sure objects are collected asynchronously by the gc
1966        for i in range(5000):
1967            a = multiprocessing.heap.BufferWrapper(1)
1968            b = multiprocessing.heap.BufferWrapper(1)
1969            # circular references
1970            a.buddy = b
1971            b.buddy = a
1972
1973#
1974#
1975#
1976
1977class _Foo(Structure):
1978    _fields_ = [
1979        ('x', c_int),
1980        ('y', c_double)
1981        ]
1982
1983class _TestSharedCTypes(BaseTestCase):
1984
1985    ALLOWED_TYPES = ('processes',)
1986
1987    def setUp(self):
1988        if not HAS_SHAREDCTYPES:
1989            self.skipTest("requires multiprocessing.sharedctypes")
1990
1991    @classmethod
1992    def _double(cls, x, y, foo, arr, string):
1993        x.value *= 2
1994        y.value *= 2
1995        foo.x *= 2
1996        foo.y *= 2
1997        string.value *= 2
1998        for i in range(len(arr)):
1999            arr[i] *= 2
2000
2001    def test_sharedctypes(self, lock=False):
2002        x = Value('i', 7, lock=lock)
2003        y = Value(c_double, 1.0/3.0, lock=lock)
2004        foo = Value(_Foo, 3, 2, lock=lock)
2005        arr = self.Array('d', range(10), lock=lock)
2006        string = self.Array('c', 20, lock=lock)
2007        string.value = latin('hello')
2008
2009        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
2010        p.daemon = True
2011        p.start()
2012        p.join()
2013
2014        self.assertEqual(x.value, 14)
2015        self.assertAlmostEqual(y.value, 2.0/3.0)
2016        self.assertEqual(foo.x, 6)
2017        self.assertAlmostEqual(foo.y, 4.0)
2018        for i in range(10):
2019            self.assertAlmostEqual(arr[i], i*2)
2020        self.assertEqual(string.value, latin('hellohello'))
2021
2022    def test_synchronize(self):
2023        self.test_sharedctypes(lock=True)
2024
2025    def test_copy(self):
2026        foo = _Foo(2, 5.0)
2027        bar = copy(foo)
2028        foo.x = 0
2029        foo.y = 0
2030        self.assertEqual(bar.x, 2)
2031        self.assertAlmostEqual(bar.y, 5.0)
2032
2033#
2034#
2035#
2036
2037class _TestFinalize(BaseTestCase):
2038
2039    ALLOWED_TYPES = ('processes',)
2040
2041    @classmethod
2042    def _test_finalize(cls, conn):
2043        class Foo(object):
2044            pass
2045
2046        a = Foo()
2047        util.Finalize(a, conn.send, args=('a',))
2048        del a           # triggers callback for a
2049
2050        b = Foo()
2051        close_b = util.Finalize(b, conn.send, args=('b',))
2052        close_b()       # triggers callback for b
2053        close_b()       # does nothing because callback has already been called
2054        del b           # does nothing because callback has already been called
2055
2056        c = Foo()
2057        util.Finalize(c, conn.send, args=('c',))
2058
2059        d10 = Foo()
2060        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2061
2062        d01 = Foo()
2063        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2064        d02 = Foo()
2065        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2066        d03 = Foo()
2067        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2068
2069        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2070
2071        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2072
2073        # call multiprocessing's cleanup function then exit process without
2074        # garbage collecting locals
2075        util._exit_function()
2076        conn.close()
2077        os._exit(0)
2078
2079    def test_finalize(self):
2080        conn, child_conn = self.Pipe()
2081
2082        p = self.Process(target=self._test_finalize, args=(child_conn,))
2083        p.daemon = True
2084        p.start()
2085        p.join()
2086
2087        result = [obj for obj in iter(conn.recv, 'STOP')]
2088        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2089
2090#
2091# Test that from ... import * works for each module
2092#
2093
2094class _TestImportStar(BaseTestCase):
2095
2096    ALLOWED_TYPES = ('processes',)
2097
2098    def test_import(self):
2099        modules = [
2100            'multiprocessing', 'multiprocessing.connection',
2101            'multiprocessing.heap', 'multiprocessing.managers',
2102            'multiprocessing.pool', 'multiprocessing.process',
2103            'multiprocessing.synchronize', 'multiprocessing.util'
2104            ]
2105
2106        if HAS_REDUCTION:
2107            modules.append('multiprocessing.reduction')
2108
2109        if c_int is not None:
2110            # This module requires _ctypes
2111            modules.append('multiprocessing.sharedctypes')
2112
2113        for name in modules:
2114            __import__(name)
2115            mod = sys.modules[name]
2116
2117            for attr in getattr(mod, '__all__', ()):
2118                self.assertTrue(
2119                    hasattr(mod, attr),
2120                    '%r does not have attribute %r' % (mod, attr)
2121                    )
2122
2123#
2124# Quick test that logging works -- does not test logging output
2125#
2126
2127class _TestLogging(BaseTestCase):
2128
2129    ALLOWED_TYPES = ('processes',)
2130
2131    def test_enable_logging(self):
2132        logger = multiprocessing.get_logger()
2133        logger.setLevel(util.SUBWARNING)
2134        self.assertTrue(logger is not None)
2135        logger.debug('this will not be printed')
2136        logger.info('nor will this')
2137        logger.setLevel(LOG_LEVEL)
2138
2139    @classmethod
2140    def _test_level(cls, conn):
2141        logger = multiprocessing.get_logger()
2142        conn.send(logger.getEffectiveLevel())
2143
2144    def test_level(self):
2145        LEVEL1 = 32
2146        LEVEL2 = 37
2147
2148        logger = multiprocessing.get_logger()
2149        root_logger = logging.getLogger()
2150        root_level = root_logger.level
2151
2152        reader, writer = multiprocessing.Pipe(duplex=False)
2153
2154        logger.setLevel(LEVEL1)
2155        p = self.Process(target=self._test_level, args=(writer,))
2156        p.daemon = True
2157        p.start()
2158        self.assertEqual(LEVEL1, reader.recv())
2159
2160        logger.setLevel(logging.NOTSET)
2161        root_logger.setLevel(LEVEL2)
2162        p = self.Process(target=self._test_level, args=(writer,))
2163        p.daemon = True
2164        p.start()
2165        self.assertEqual(LEVEL2, reader.recv())
2166
2167        root_logger.setLevel(root_level)
2168        logger.setLevel(level=LOG_LEVEL)
2169
2170
2171# class _TestLoggingProcessName(BaseTestCase):
2172#
2173#     def handle(self, record):
2174#         assert record.processName == multiprocessing.current_process().name
2175#         self.__handled = True
2176#
2177#     def test_logging(self):
2178#         handler = logging.Handler()
2179#         handler.handle = self.handle
2180#         self.__handled = False
2181#         # Bypass getLogger() and side-effects
2182#         logger = logging.getLoggerClass()(
2183#                 'multiprocessing.test.TestLoggingProcessName')
2184#         logger.addHandler(handler)
2185#         logger.propagate = False
2186#
2187#         logger.warn('foo')
2188#         assert self.__handled
2189
2190#
2191# Check that Process.join() retries if os.waitpid() fails with EINTR
2192#
2193
2194class _TestPollEintr(BaseTestCase):
2195
2196    ALLOWED_TYPES = ('processes',)
2197
2198    @classmethod
2199    def _killer(cls, pid):
2200        time.sleep(0.5)
2201        os.kill(pid, signal.SIGUSR1)
2202
2203    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2204    def test_poll_eintr(self):
2205        got_signal = [False]
2206        def record(*args):
2207            got_signal[0] = True
2208        pid = os.getpid()
2209        oldhandler = signal.signal(signal.SIGUSR1, record)
2210        try:
2211            killer = self.Process(target=self._killer, args=(pid,))
2212            killer.start()
2213            p = self.Process(target=time.sleep, args=(1,))
2214            p.start()
2215            p.join()
2216            self.assertTrue(got_signal[0])
2217            self.assertEqual(p.exitcode, 0)
2218            killer.join()
2219        finally:
2220            signal.signal(signal.SIGUSR1, oldhandler)
2221
2222#
2223# Test to verify handle verification, see issue 3321
2224#
2225
2226class TestInvalidHandle(unittest.TestCase):
2227
2228    @unittest.skipIf(WIN32, "skipped on Windows")
2229    def test_invalid_handles(self):
2230        conn = _multiprocessing.Connection(44977608)
2231        self.assertRaises(IOError, conn.poll)
2232        self.assertRaises(IOError, _multiprocessing.Connection, -1)
2233
2234#
2235# Functions used to create test cases from the base ones in this module
2236#
2237
2238def get_attributes(Source, names):
2239    d = {}
2240    for name in names:
2241        obj = getattr(Source, name)
2242        if type(obj) == type(get_attributes):
2243            obj = staticmethod(obj)
2244        d[name] = obj
2245    return d
2246
2247def create_test_cases(Mixin, type):
2248    result = {}
2249    glob = globals()
2250    Type = type.capitalize()
2251
2252    for name in glob.keys():
2253        if name.startswith('_Test'):
2254            base = glob[name]
2255            if type in base.ALLOWED_TYPES:
2256                newname = 'With' + Type + name[1:]
2257                class Temp(base, unittest.TestCase, Mixin):
2258                    pass
2259                result[newname] = Temp
2260                Temp.__name__ = newname
2261                Temp.__module__ = Mixin.__module__
2262    return result
2263
2264#
2265# Create test cases
2266#
2267
2268class ProcessesMixin(object):
2269    TYPE = 'processes'
2270    Process = multiprocessing.Process
2271    locals().update(get_attributes(multiprocessing, (
2272        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2273        'Condition', 'Event', 'Value', 'Array', 'RawValue',
2274        'RawArray', 'current_process', 'active_children', 'Pipe',
2275        'connection', 'JoinableQueue', 'Pool'
2276        )))
2277
2278testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2279globals().update(testcases_processes)
2280
2281
2282class ManagerMixin(object):
2283    TYPE = 'manager'
2284    Process = multiprocessing.Process
2285    manager = object.__new__(multiprocessing.managers.SyncManager)
2286    locals().update(get_attributes(manager, (
2287        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2288       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2289        'Namespace', 'JoinableQueue', 'Pool'
2290        )))
2291
2292testcases_manager = create_test_cases(ManagerMixin, type='manager')
2293globals().update(testcases_manager)
2294
2295
2296class ThreadsMixin(object):
2297    TYPE = 'threads'
2298    Process = multiprocessing.dummy.Process
2299    locals().update(get_attributes(multiprocessing.dummy, (
2300        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2301        'Condition', 'Event', 'Value', 'Array', 'current_process',
2302        'active_children', 'Pipe', 'connection', 'dict', 'list',
2303        'Namespace', 'JoinableQueue', 'Pool'
2304        )))
2305
2306testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2307globals().update(testcases_threads)
2308
2309class OtherTest(unittest.TestCase):
2310    # TODO: add more tests for deliver/answer challenge.
2311    def test_deliver_challenge_auth_failure(self):
2312        class _FakeConnection(object):
2313            def recv_bytes(self, size):
2314                return b'something bogus'
2315            def send_bytes(self, data):
2316                pass
2317        self.assertRaises(multiprocessing.AuthenticationError,
2318                          multiprocessing.connection.deliver_challenge,
2319                          _FakeConnection(), b'abc')
2320
2321    def test_answer_challenge_auth_failure(self):
2322        class _FakeConnection(object):
2323            def __init__(self):
2324                self.count = 0
2325            def recv_bytes(self, size):
2326                self.count += 1
2327                if self.count == 1:
2328                    return multiprocessing.connection.CHALLENGE
2329                elif self.count == 2:
2330                    return b'something bogus'
2331                return b''
2332            def send_bytes(self, data):
2333                pass
2334        self.assertRaises(multiprocessing.AuthenticationError,
2335                          multiprocessing.connection.answer_challenge,
2336                          _FakeConnection(), b'abc')
2337
2338#
2339# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2340#
2341
2342def initializer(ns):
2343    ns.test += 1
2344
2345class TestInitializers(unittest.TestCase):
2346    def setUp(self):
2347        self.mgr = multiprocessing.Manager()
2348        self.ns = self.mgr.Namespace()
2349        self.ns.test = 0
2350
2351    def tearDown(self):
2352        self.mgr.shutdown()
2353
2354    def test_manager_initializer(self):
2355        m = multiprocessing.managers.SyncManager()
2356        self.assertRaises(TypeError, m.start, 1)
2357        m.start(initializer, (self.ns,))
2358        self.assertEqual(self.ns.test, 1)
2359        m.shutdown()
2360
2361    def test_pool_initializer(self):
2362        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2363        p = multiprocessing.Pool(1, initializer, (self.ns,))
2364        p.close()
2365        p.join()
2366        self.assertEqual(self.ns.test, 1)
2367
2368#
2369# Issue 5155, 5313, 5331: Test process in processes
2370# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2371#
2372
2373def _this_sub_process(q):
2374    try:
2375        item = q.get(block=False)
2376    except Queue.Empty:
2377        pass
2378
2379def _test_process(q):
2380    queue = multiprocessing.Queue()
2381    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2382    subProc.daemon = True
2383    subProc.start()
2384    subProc.join()
2385
2386def _afunc(x):
2387    return x*x
2388
2389def pool_in_process():
2390    pool = multiprocessing.Pool(processes=4)
2391    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2392
2393class _file_like(object):
2394    def __init__(self, delegate):
2395        self._delegate = delegate
2396        self._pid = None
2397
2398    @property
2399    def cache(self):
2400        pid = os.getpid()
2401        # There are no race conditions since fork keeps only the running thread
2402        if pid != self._pid:
2403            self._pid = pid
2404            self._cache = []
2405        return self._cache
2406
2407    def write(self, data):
2408        self.cache.append(data)
2409
2410    def flush(self):
2411        self._delegate.write(''.join(self.cache))
2412        self._cache = []
2413
2414class TestStdinBadfiledescriptor(unittest.TestCase):
2415
2416    def test_queue_in_process(self):
2417        queue = multiprocessing.Queue()
2418        proc = multiprocessing.Process(target=_test_process, args=(queue,))
2419        proc.start()
2420        proc.join()
2421
2422    def test_pool_in_process(self):
2423        p = multiprocessing.Process(target=pool_in_process)
2424        p.start()
2425        p.join()
2426
2427    def test_flushing(self):
2428        sio = StringIO()
2429        flike = _file_like(sio)
2430        flike.write('foo')
2431        proc = multiprocessing.Process(target=lambda: flike.flush())
2432        flike.flush()
2433        assert sio.getvalue() == 'foo'
2434
2435#
2436# Test interaction with socket timeouts - see Issue #6056
2437#
2438
2439class TestTimeouts(unittest.TestCase):
2440    @classmethod
2441    def _test_timeout(cls, child, address):
2442        time.sleep(1)
2443        child.send(123)
2444        child.close()
2445        conn = multiprocessing.connection.Client(address)
2446        conn.send(456)
2447        conn.close()
2448
2449    def test_timeout(self):
2450        old_timeout = socket.getdefaulttimeout()
2451        try:
2452            socket.setdefaulttimeout(0.1)
2453            parent, child = multiprocessing.Pipe(duplex=True)
2454            l = multiprocessing.connection.Listener(family='AF_INET')
2455            p = multiprocessing.Process(target=self._test_timeout,
2456                                        args=(child, l.address))
2457            p.start()
2458            child.close()
2459            self.assertEqual(parent.recv(), 123)
2460            parent.close()
2461            conn = l.accept()
2462            self.assertEqual(conn.recv(), 456)
2463            conn.close()
2464            l.close()
2465            p.join(10)
2466        finally:
2467            socket.setdefaulttimeout(old_timeout)
2468
2469#
2470# Test what happens with no "if __name__ == '__main__'"
2471#
2472
2473class TestNoForkBomb(unittest.TestCase):
2474    def test_noforkbomb(self):
2475        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2476        if WIN32:
2477            rc, out, err = test.script_helper.assert_python_failure(name)
2478            self.assertEqual(out, '')
2479            self.assertIn('RuntimeError', err)
2480        else:
2481            rc, out, err = test.script_helper.assert_python_ok(name)
2482            self.assertEqual(out.rstrip(), '123')
2483            self.assertEqual(err, '')
2484
2485#
2486# Issue 12098: check sys.flags of child matches that for parent
2487#
2488
2489class TestFlags(unittest.TestCase):
2490    @classmethod
2491    def run_in_grandchild(cls, conn):
2492        conn.send(tuple(sys.flags))
2493
2494    @classmethod
2495    def run_in_child(cls):
2496        import json
2497        r, w = multiprocessing.Pipe(duplex=False)
2498        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2499        p.start()
2500        grandchild_flags = r.recv()
2501        p.join()
2502        r.close()
2503        w.close()
2504        flags = (tuple(sys.flags), grandchild_flags)
2505        print(json.dumps(flags))
2506
2507    @test_support.requires_unicode  # XXX json needs unicode support
2508    def test_flags(self):
2509        import json, subprocess
2510        # start child process using unusual flags
2511        prog = ('from test.test_multiprocessing import TestFlags; ' +
2512                'TestFlags.run_in_child()')
2513        data = subprocess.check_output(
2514            [sys.executable, '-E', '-B', '-O', '-c', prog])
2515        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2516        self.assertEqual(child_flags, grandchild_flags)
2517
2518#
2519# Issue #17555: ForkAwareThreadLock
2520#
2521
2522class TestForkAwareThreadLock(unittest.TestCase):
2523    # We recurisvely start processes.  Issue #17555 meant that the
2524    # after fork registry would get duplicate entries for the same
2525    # lock.  The size of the registry at generation n was ~2**n.
2526
2527    @classmethod
2528    def child(cls, n, conn):
2529        if n > 1:
2530            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2531            p.start()
2532            p.join()
2533        else:
2534            conn.send(len(util._afterfork_registry))
2535        conn.close()
2536
2537    def test_lock(self):
2538        r, w = multiprocessing.Pipe(False)
2539        l = util.ForkAwareThreadLock()
2540        old_size = len(util._afterfork_registry)
2541        p = multiprocessing.Process(target=self.child, args=(5, w))
2542        p.start()
2543        new_size = r.recv()
2544        p.join()
2545        self.assertLessEqual(new_size, old_size)
2546
2547#
2548# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2549#
2550
2551class TestIgnoreEINTR(unittest.TestCase):
2552
2553    @classmethod
2554    def _test_ignore(cls, conn):
2555        def handler(signum, frame):
2556            pass
2557        signal.signal(signal.SIGUSR1, handler)
2558        conn.send('ready')
2559        x = conn.recv()
2560        conn.send(x)
2561        conn.send_bytes(b'x'*(1024*1024))   # sending 1 MB should block
2562
2563    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2564    def test_ignore(self):
2565        conn, child_conn = multiprocessing.Pipe()
2566        try:
2567            p = multiprocessing.Process(target=self._test_ignore,
2568                                        args=(child_conn,))
2569            p.daemon = True
2570            p.start()
2571            child_conn.close()
2572            self.assertEqual(conn.recv(), 'ready')
2573            time.sleep(0.1)
2574            os.kill(p.pid, signal.SIGUSR1)
2575            time.sleep(0.1)
2576            conn.send(1234)
2577            self.assertEqual(conn.recv(), 1234)
2578            time.sleep(0.1)
2579            os.kill(p.pid, signal.SIGUSR1)
2580            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2581            time.sleep(0.1)
2582            p.join()
2583        finally:
2584            conn.close()
2585
2586    @classmethod
2587    def _test_ignore_listener(cls, conn):
2588        def handler(signum, frame):
2589            pass
2590        signal.signal(signal.SIGUSR1, handler)
2591        l = multiprocessing.connection.Listener()
2592        conn.send(l.address)
2593        a = l.accept()
2594        a.send('welcome')
2595
2596    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2597    def test_ignore_listener(self):
2598        conn, child_conn = multiprocessing.Pipe()
2599        try:
2600            p = multiprocessing.Process(target=self._test_ignore_listener,
2601                                        args=(child_conn,))
2602            p.daemon = True
2603            p.start()
2604            child_conn.close()
2605            address = conn.recv()
2606            time.sleep(0.1)
2607            os.kill(p.pid, signal.SIGUSR1)
2608            time.sleep(0.1)
2609            client = multiprocessing.connection.Client(address)
2610            self.assertEqual(client.recv(), 'welcome')
2611            p.join()
2612        finally:
2613            conn.close()
2614
2615#
2616#
2617#
2618
2619testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2620                   TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
2621                   TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
2622
2623#
2624#
2625#
2626
2627def test_main(run=None):
2628    if sys.platform.startswith("linux"):
2629        try:
2630            lock = multiprocessing.RLock()
2631        except OSError:
2632            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
2633
2634    check_enough_semaphores()
2635
2636    if run is None:
2637        from test.test_support import run_unittest as run
2638
2639    util.get_temp_dir()     # creates temp directory for use by all processes
2640
2641    multiprocessing.get_logger().setLevel(LOG_LEVEL)
2642
2643    ProcessesMixin.pool = multiprocessing.Pool(4)
2644    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2645    ManagerMixin.manager.__init__()
2646    ManagerMixin.manager.start()
2647    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
2648
2649    testcases = (
2650        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2651        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
2652        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2653        testcases_other
2654        )
2655
2656    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2657    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2658    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2659    # module during these tests is at least platform dependent and possibly
2660    # non-deterministic on any given platform. So we don't mind if the listed
2661    # warnings aren't actually raised.
2662    with test_support.check_py3k_warnings(
2663            (".+__(get|set)slice__ has been removed", DeprecationWarning),
2664            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2665            quiet=True):
2666        run(suite)
2667
2668    ThreadsMixin.pool.terminate()
2669    ProcessesMixin.pool.terminate()
2670    ManagerMixin.pool.terminate()
2671    ManagerMixin.manager.shutdown()
2672
2673    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
2674
2675def main():
2676    test_main(unittest.TextTestRunner(verbosity=2).run)
2677
2678if __name__ == '__main__':
2679    main()
2680