• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2
3#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
15import socket
16import random
17import logging
18from test import test_support
19from StringIO import StringIO
20_multiprocessing = test_support.import_module('_multiprocessing')
21# import threading after _multiprocessing to raise a more relevant error
22# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
25
26# Work around broken sem_open implementations
27test_support.import_module('multiprocessing.synchronize')
28
29import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
33import multiprocessing.pool
34
35from multiprocessing import util
36
37try:
38    from multiprocessing.sharedctypes import Value, copy
39    HAS_SHAREDCTYPES = True
40except ImportError:
41    HAS_SHAREDCTYPES = False
42
43#
44#
45#
46
47latin = str
48
49#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
54#LOG_LEVEL = logging.DEBUG
55
56DELTA = 0.1
57CHECK_TIMINGS = False     # making true makes tests take a lot longer
58                          # and can sometimes cause some non-serious
59                          # failures because some calls block a bit
60                          # longer than expected
61if CHECK_TIMINGS:
62    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67                            'HAVE_BROKEN_SEM_GETVALUE', False)
68
69WIN32 = (sys.platform == "win32")
70
71#
72# Some tests require ctypes
73#
74
75try:
76    from ctypes import Structure, c_int, c_double
77except ImportError:
78    Structure = object
79    c_int = c_double = None
80
81#
82# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87    def __init__(self, func):
88        self.func = func
89        self.elapsed = None
90
91    def __call__(self, *args, **kwds):
92        t = time.time()
93        try:
94            return self.func(*args, **kwds)
95        finally:
96            self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104    ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106    def assertTimingAlmostEqual(self, a, b):
107        if CHECK_TIMINGS:
108            self.assertAlmostEqual(a, b, 1)
109
110    def assertReturnsIfImplemented(self, value, func, *args):
111        try:
112            res = func(*args)
113        except NotImplementedError:
114            pass
115        else:
116            return self.assertEqual(value, res)
117
118    # For the sanity of Windows users, rather than crashing or freezing in
119    # multiple ways.
120    def __reduce__(self, *args):
121        raise NotImplementedError("shouldn't try to pickle a test case")
122
123    __reduce_ex__ = __reduce__
124
125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130    try:
131        return self.get_value()
132    except AttributeError:
133        try:
134            return self._Semaphore__value
135        except AttributeError:
136            try:
137                return self._value
138            except AttributeError:
139                raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147    ALLOWED_TYPES = ('processes', 'threads')
148
149    def test_current(self):
150        if self.TYPE == 'threads':
151            return
152
153        current = self.current_process()
154        authkey = current.authkey
155
156        self.assertTrue(current.is_alive())
157        self.assertTrue(not current.daemon)
158        self.assertIsInstance(authkey, bytes)
159        self.assertTrue(len(authkey) > 0)
160        self.assertEqual(current.ident, os.getpid())
161        self.assertEqual(current.exitcode, None)
162
163    @classmethod
164    def _test(cls, q, *args, **kwds):
165        current = cls.current_process()
166        q.put(args)
167        q.put(kwds)
168        q.put(current.name)
169        if cls.TYPE != 'threads':
170            q.put(bytes(current.authkey))
171            q.put(current.pid)
172
173    def test_process(self):
174        q = self.Queue(1)
175        e = self.Event()
176        args = (q, 1, 2)
177        kwargs = {'hello':23, 'bye':2.54}
178        name = 'SomeProcess'
179        p = self.Process(
180            target=self._test, args=args, kwargs=kwargs, name=name
181            )
182        p.daemon = True
183        current = self.current_process()
184
185        if self.TYPE != 'threads':
186            self.assertEqual(p.authkey, current.authkey)
187        self.assertEqual(p.is_alive(), False)
188        self.assertEqual(p.daemon, True)
189        self.assertNotIn(p, self.active_children())
190        self.assertTrue(type(self.active_children()) is list)
191        self.assertEqual(p.exitcode, None)
192
193        p.start()
194
195        self.assertEqual(p.exitcode, None)
196        self.assertEqual(p.is_alive(), True)
197        self.assertIn(p, self.active_children())
198
199        self.assertEqual(q.get(), args[1:])
200        self.assertEqual(q.get(), kwargs)
201        self.assertEqual(q.get(), p.name)
202        if self.TYPE != 'threads':
203            self.assertEqual(q.get(), current.authkey)
204            self.assertEqual(q.get(), p.pid)
205
206        p.join()
207
208        self.assertEqual(p.exitcode, 0)
209        self.assertEqual(p.is_alive(), False)
210        self.assertNotIn(p, self.active_children())
211
212    @classmethod
213    def _test_terminate(cls):
214        time.sleep(1000)
215
216    def test_terminate(self):
217        if self.TYPE == 'threads':
218            return
219
220        p = self.Process(target=self._test_terminate)
221        p.daemon = True
222        p.start()
223
224        self.assertEqual(p.is_alive(), True)
225        self.assertIn(p, self.active_children())
226        self.assertEqual(p.exitcode, None)
227
228        p.terminate()
229
230        join = TimingWrapper(p.join)
231        self.assertEqual(join(), None)
232        self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234        self.assertEqual(p.is_alive(), False)
235        self.assertNotIn(p, self.active_children())
236
237        p.join()
238
239        # XXX sometimes get p.exitcode == 0 on Windows ...
240        #self.assertEqual(p.exitcode, -signal.SIGTERM)
241
242    def test_cpu_count(self):
243        try:
244            cpus = multiprocessing.cpu_count()
245        except NotImplementedError:
246            cpus = 1
247        self.assertTrue(type(cpus) is int)
248        self.assertTrue(cpus >= 1)
249
250    def test_active_children(self):
251        self.assertEqual(type(self.active_children()), list)
252
253        p = self.Process(target=time.sleep, args=(DELTA,))
254        self.assertNotIn(p, self.active_children())
255
256        p.start()
257        self.assertIn(p, self.active_children())
258
259        p.join()
260        self.assertNotIn(p, self.active_children())
261
262    @classmethod
263    def _test_recursion(cls, wconn, id):
264        from multiprocessing import forking
265        wconn.send(id)
266        if len(id) < 2:
267            for i in range(2):
268                p = cls.Process(
269                    target=cls._test_recursion, args=(wconn, id+[i])
270                    )
271                p.start()
272                p.join()
273
274    def test_recursion(self):
275        rconn, wconn = self.Pipe(duplex=False)
276        self._test_recursion(wconn, [])
277
278        time.sleep(DELTA)
279        result = []
280        while rconn.poll():
281            result.append(rconn.recv())
282
283        expected = [
284            [],
285              [0],
286                [0, 0],
287                [0, 1],
288              [1],
289                [1, 0],
290                [1, 1]
291            ]
292        self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300    def __init__(self):
301        multiprocessing.Process.__init__(self)
302        self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304    def run(self):
305        self.parent_conn.close()
306        for s in iter(self.child_conn.recv, None):
307            self.child_conn.send(s.upper())
308        self.child_conn.close()
309
310    def submit(self, s):
311        assert type(s) is str
312        self.parent_conn.send(s)
313        return self.parent_conn.recv()
314
315    def stop(self):
316        self.parent_conn.send(None)
317        self.parent_conn.close()
318        self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322    ALLOWED_TYPES = ('processes',)
323
324    def test_subclassing(self):
325        uppercaser = _UpperCaser()
326        uppercaser.start()
327        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328        self.assertEqual(uppercaser.submit('world'), 'WORLD')
329        uppercaser.stop()
330        uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337    if hasattr(q, 'empty'):
338        return q.empty()
339    else:
340        return q.qsize() == 0
341
342def queue_full(q, maxsize):
343    if hasattr(q, 'full'):
344        return q.full()
345    else:
346        return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
352    @classmethod
353    def _test_put(cls, queue, child_can_start, parent_can_continue):
354        child_can_start.wait()
355        for i in range(6):
356            queue.get()
357        parent_can_continue.set()
358
359    def test_put(self):
360        MAXSIZE = 6
361        queue = self.Queue(maxsize=MAXSIZE)
362        child_can_start = self.Event()
363        parent_can_continue = self.Event()
364
365        proc = self.Process(
366            target=self._test_put,
367            args=(queue, child_can_start, parent_can_continue)
368            )
369        proc.daemon = True
370        proc.start()
371
372        self.assertEqual(queue_empty(queue), True)
373        self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375        queue.put(1)
376        queue.put(2, True)
377        queue.put(3, True, None)
378        queue.put(4, False)
379        queue.put(5, False, None)
380        queue.put_nowait(6)
381
382        # the values may be in buffer but not yet in pipe so sleep a bit
383        time.sleep(DELTA)
384
385        self.assertEqual(queue_empty(queue), False)
386        self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388        put = TimingWrapper(queue.put)
389        put_nowait = TimingWrapper(queue.put_nowait)
390
391        self.assertRaises(Queue.Full, put, 7, False)
392        self.assertTimingAlmostEqual(put.elapsed, 0)
393
394        self.assertRaises(Queue.Full, put, 7, False, None)
395        self.assertTimingAlmostEqual(put.elapsed, 0)
396
397        self.assertRaises(Queue.Full, put_nowait, 7)
398        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
401        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
404        self.assertTimingAlmostEqual(put.elapsed, 0)
405
406        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
407        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409        child_can_start.set()
410        parent_can_continue.wait()
411
412        self.assertEqual(queue_empty(queue), True)
413        self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415        proc.join()
416
417    @classmethod
418    def _test_get(cls, queue, child_can_start, parent_can_continue):
419        child_can_start.wait()
420        #queue.put(1)
421        queue.put(2)
422        queue.put(3)
423        queue.put(4)
424        queue.put(5)
425        parent_can_continue.set()
426
427    def test_get(self):
428        queue = self.Queue()
429        child_can_start = self.Event()
430        parent_can_continue = self.Event()
431
432        proc = self.Process(
433            target=self._test_get,
434            args=(queue, child_can_start, parent_can_continue)
435            )
436        proc.daemon = True
437        proc.start()
438
439        self.assertEqual(queue_empty(queue), True)
440
441        child_can_start.set()
442        parent_can_continue.wait()
443
444        time.sleep(DELTA)
445        self.assertEqual(queue_empty(queue), False)
446
447        # Hangs unexpectedly, remove for now
448        #self.assertEqual(queue.get(), 1)
449        self.assertEqual(queue.get(True, None), 2)
450        self.assertEqual(queue.get(True), 3)
451        self.assertEqual(queue.get(timeout=1), 4)
452        self.assertEqual(queue.get_nowait(), 5)
453
454        self.assertEqual(queue_empty(queue), True)
455
456        get = TimingWrapper(queue.get)
457        get_nowait = TimingWrapper(queue.get_nowait)
458
459        self.assertRaises(Queue.Empty, get, False)
460        self.assertTimingAlmostEqual(get.elapsed, 0)
461
462        self.assertRaises(Queue.Empty, get, False, None)
463        self.assertTimingAlmostEqual(get.elapsed, 0)
464
465        self.assertRaises(Queue.Empty, get_nowait)
466        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468        self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
469        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471        self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
472        self.assertTimingAlmostEqual(get.elapsed, 0)
473
474        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
475        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477        proc.join()
478
479    @classmethod
480    def _test_fork(cls, queue):
481        for i in range(10, 20):
482            queue.put(i)
483        # note that at this point the items may only be buffered, so the
484        # process cannot shutdown until the feeder thread has finished
485        # pushing items onto the pipe.
486
487    def test_fork(self):
488        # Old versions of Queue would fail to create a new feeder
489        # thread for a forked process if the original process had its
490        # own feeder thread.  This test checks that this no longer
491        # happens.
492
493        queue = self.Queue()
494
495        # put items on queue so that main process starts a feeder thread
496        for i in range(10):
497            queue.put(i)
498
499        # wait to make sure thread starts before we fork a new process
500        time.sleep(DELTA)
501
502        # fork process
503        p = self.Process(target=self._test_fork, args=(queue,))
504        p.start()
505
506        # check that all expected items are in the queue
507        for i in range(20):
508            self.assertEqual(queue.get(), i)
509        self.assertRaises(Queue.Empty, queue.get, False)
510
511        p.join()
512
513    def test_qsize(self):
514        q = self.Queue()
515        try:
516            self.assertEqual(q.qsize(), 0)
517        except NotImplementedError:
518            return
519        q.put(1)
520        self.assertEqual(q.qsize(), 1)
521        q.put(5)
522        self.assertEqual(q.qsize(), 2)
523        q.get()
524        self.assertEqual(q.qsize(), 1)
525        q.get()
526        self.assertEqual(q.qsize(), 0)
527
528    @classmethod
529    def _test_task_done(cls, q):
530        for obj in iter(q.get, None):
531            time.sleep(DELTA)
532            q.task_done()
533
534    def test_task_done(self):
535        queue = self.JoinableQueue()
536
537        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
538            self.skipTest("requires 'queue.task_done()' method")
539
540        workers = [self.Process(target=self._test_task_done, args=(queue,))
541                   for i in xrange(4)]
542
543        for p in workers:
544            p.start()
545
546        for i in xrange(10):
547            queue.put(i)
548
549        queue.join()
550
551        for p in workers:
552            queue.put(None)
553
554        for p in workers:
555            p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563    def test_lock(self):
564        lock = self.Lock()
565        self.assertEqual(lock.acquire(), True)
566        self.assertEqual(lock.acquire(False), False)
567        self.assertEqual(lock.release(), None)
568        self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570    def test_rlock(self):
571        lock = self.RLock()
572        self.assertEqual(lock.acquire(), True)
573        self.assertEqual(lock.acquire(), True)
574        self.assertEqual(lock.acquire(), True)
575        self.assertEqual(lock.release(), None)
576        self.assertEqual(lock.release(), None)
577        self.assertEqual(lock.release(), None)
578        self.assertRaises((AssertionError, RuntimeError), lock.release)
579
580    def test_lock_context(self):
581        with self.Lock():
582            pass
583
584
585class _TestSemaphore(BaseTestCase):
586
587    def _test_semaphore(self, sem):
588        self.assertReturnsIfImplemented(2, get_value, sem)
589        self.assertEqual(sem.acquire(), True)
590        self.assertReturnsIfImplemented(1, get_value, sem)
591        self.assertEqual(sem.acquire(), True)
592        self.assertReturnsIfImplemented(0, get_value, sem)
593        self.assertEqual(sem.acquire(False), False)
594        self.assertReturnsIfImplemented(0, get_value, sem)
595        self.assertEqual(sem.release(), None)
596        self.assertReturnsIfImplemented(1, get_value, sem)
597        self.assertEqual(sem.release(), None)
598        self.assertReturnsIfImplemented(2, get_value, sem)
599
600    def test_semaphore(self):
601        sem = self.Semaphore(2)
602        self._test_semaphore(sem)
603        self.assertEqual(sem.release(), None)
604        self.assertReturnsIfImplemented(3, get_value, sem)
605        self.assertEqual(sem.release(), None)
606        self.assertReturnsIfImplemented(4, get_value, sem)
607
608    def test_bounded_semaphore(self):
609        sem = self.BoundedSemaphore(2)
610        self._test_semaphore(sem)
611        # Currently fails on OS/X
612        #if HAVE_GETVALUE:
613        #    self.assertRaises(ValueError, sem.release)
614        #    self.assertReturnsIfImplemented(2, get_value, sem)
615
616    def test_timeout(self):
617        if self.TYPE != 'processes':
618            return
619
620        sem = self.Semaphore(0)
621        acquire = TimingWrapper(sem.acquire)
622
623        self.assertEqual(acquire(False), False)
624        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626        self.assertEqual(acquire(False, None), False)
627        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629        self.assertEqual(acquire(False, TIMEOUT1), False)
630        self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632        self.assertEqual(acquire(True, TIMEOUT2), False)
633        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635        self.assertEqual(acquire(timeout=TIMEOUT3), False)
636        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
641    @classmethod
642    def f(cls, cond, sleeping, woken, timeout=None):
643        cond.acquire()
644        sleeping.release()
645        cond.wait(timeout)
646        woken.release()
647        cond.release()
648
649    def check_invariant(self, cond):
650        # this is only supposed to succeed when there are no sleepers
651        if self.TYPE == 'processes':
652            try:
653                sleepers = (cond._sleeping_count.get_value() -
654                            cond._woken_count.get_value())
655                self.assertEqual(sleepers, 0)
656                self.assertEqual(cond._wait_semaphore.get_value(), 0)
657            except NotImplementedError:
658                pass
659
660    def test_notify(self):
661        cond = self.Condition()
662        sleeping = self.Semaphore(0)
663        woken = self.Semaphore(0)
664
665        p = self.Process(target=self.f, args=(cond, sleeping, woken))
666        p.daemon = True
667        p.start()
668
669        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
670        p.daemon = True
671        p.start()
672
673        # wait for both children to start sleeping
674        sleeping.acquire()
675        sleeping.acquire()
676
677        # check no process/thread has woken up
678        time.sleep(DELTA)
679        self.assertReturnsIfImplemented(0, get_value, woken)
680
681        # wake up one process/thread
682        cond.acquire()
683        cond.notify()
684        cond.release()
685
686        # check one process/thread has woken up
687        time.sleep(DELTA)
688        self.assertReturnsIfImplemented(1, get_value, woken)
689
690        # wake up another
691        cond.acquire()
692        cond.notify()
693        cond.release()
694
695        # check other has woken up
696        time.sleep(DELTA)
697        self.assertReturnsIfImplemented(2, get_value, woken)
698
699        # check state is not mucked up
700        self.check_invariant(cond)
701        p.join()
702
703    def test_notify_all(self):
704        cond = self.Condition()
705        sleeping = self.Semaphore(0)
706        woken = self.Semaphore(0)
707
708        # start some threads/processes which will timeout
709        for i in range(3):
710            p = self.Process(target=self.f,
711                             args=(cond, sleeping, woken, TIMEOUT1))
712            p.daemon = True
713            p.start()
714
715            t = threading.Thread(target=self.f,
716                                 args=(cond, sleeping, woken, TIMEOUT1))
717            t.daemon = True
718            t.start()
719
720        # wait for them all to sleep
721        for i in xrange(6):
722            sleeping.acquire()
723
724        # check they have all timed out
725        for i in xrange(6):
726            woken.acquire()
727        self.assertReturnsIfImplemented(0, get_value, woken)
728
729        # check state is not mucked up
730        self.check_invariant(cond)
731
732        # start some more threads/processes
733        for i in range(3):
734            p = self.Process(target=self.f, args=(cond, sleeping, woken))
735            p.daemon = True
736            p.start()
737
738            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
739            t.daemon = True
740            t.start()
741
742        # wait for them to all sleep
743        for i in xrange(6):
744            sleeping.acquire()
745
746        # check no process/thread has woken up
747        time.sleep(DELTA)
748        self.assertReturnsIfImplemented(0, get_value, woken)
749
750        # wake them all up
751        cond.acquire()
752        cond.notify_all()
753        cond.release()
754
755        # check they have all woken
756        time.sleep(DELTA)
757        self.assertReturnsIfImplemented(6, get_value, woken)
758
759        # check state is not mucked up
760        self.check_invariant(cond)
761
762    def test_timeout(self):
763        cond = self.Condition()
764        wait = TimingWrapper(cond.wait)
765        cond.acquire()
766        res = wait(TIMEOUT1)
767        cond.release()
768        self.assertEqual(res, None)
769        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
774    @classmethod
775    def _test_event(cls, event):
776        time.sleep(TIMEOUT2)
777        event.set()
778
779    def test_event(self):
780        event = self.Event()
781        wait = TimingWrapper(event.wait)
782
783        # Removed temporarily, due to API shear, this does not
784        # work with threading._Event objects. is_set == isSet
785        self.assertEqual(event.is_set(), False)
786
787        # Removed, threading.Event.wait() will return the value of the __flag
788        # instead of None. API Shear with the semaphore backed mp.Event
789        self.assertEqual(wait(0.0), False)
790        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
791        self.assertEqual(wait(TIMEOUT1), False)
792        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794        event.set()
795
796        # See note above on the API differences
797        self.assertEqual(event.is_set(), True)
798        self.assertEqual(wait(), True)
799        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
800        self.assertEqual(wait(TIMEOUT1), True)
801        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802        # self.assertEqual(event.is_set(), True)
803
804        event.clear()
805
806        #self.assertEqual(event.is_set(), False)
807
808        self.Process(target=self._test_event, args=(event,)).start()
809        self.assertEqual(wait(), True)
810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
817    ALLOWED_TYPES = ('processes',)
818
819    codes_values = [
820        ('i', 4343, 24234),
821        ('d', 3.625, -4.25),
822        ('h', -232, 234),
823        ('c', latin('x'), latin('y'))
824        ]
825
826    def setUp(self):
827        if not HAS_SHAREDCTYPES:
828            self.skipTest("requires multiprocessing.sharedctypes")
829
830    @classmethod
831    def _test(cls, values):
832        for sv, cv in zip(values, cls.codes_values):
833            sv.value = cv[2]
834
835
836    def test_value(self, raw=False):
837        if raw:
838            values = [self.RawValue(code, value)
839                      for code, value, _ in self.codes_values]
840        else:
841            values = [self.Value(code, value)
842                      for code, value, _ in self.codes_values]
843
844        for sv, cv in zip(values, self.codes_values):
845            self.assertEqual(sv.value, cv[1])
846
847        proc = self.Process(target=self._test, args=(values,))
848        proc.start()
849        proc.join()
850
851        for sv, cv in zip(values, self.codes_values):
852            self.assertEqual(sv.value, cv[2])
853
854    def test_rawvalue(self):
855        self.test_value(raw=True)
856
857    def test_getobj_getlock(self):
858        val1 = self.Value('i', 5)
859        lock1 = val1.get_lock()
860        obj1 = val1.get_obj()
861
862        val2 = self.Value('i', 5, lock=None)
863        lock2 = val2.get_lock()
864        obj2 = val2.get_obj()
865
866        lock = self.Lock()
867        val3 = self.Value('i', 5, lock=lock)
868        lock3 = val3.get_lock()
869        obj3 = val3.get_obj()
870        self.assertEqual(lock, lock3)
871
872        arr4 = self.Value('i', 5, lock=False)
873        self.assertFalse(hasattr(arr4, 'get_lock'))
874        self.assertFalse(hasattr(arr4, 'get_obj'))
875
876        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878        arr5 = self.RawValue('i', 5)
879        self.assertFalse(hasattr(arr5, 'get_lock'))
880        self.assertFalse(hasattr(arr5, 'get_obj'))
881
882
883class _TestArray(BaseTestCase):
884
885    ALLOWED_TYPES = ('processes',)
886
887    @classmethod
888    def f(cls, seq):
889        for i in range(1, len(seq)):
890            seq[i] += seq[i-1]
891
892    @unittest.skipIf(c_int is None, "requires _ctypes")
893    def test_array(self, raw=False):
894        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895        if raw:
896            arr = self.RawArray('i', seq)
897        else:
898            arr = self.Array('i', seq)
899
900        self.assertEqual(len(arr), len(seq))
901        self.assertEqual(arr[3], seq[3])
902        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906        self.assertEqual(list(arr[:]), seq)
907
908        self.f(seq)
909
910        p = self.Process(target=self.f, args=(arr,))
911        p.start()
912        p.join()
913
914        self.assertEqual(list(arr[:]), seq)
915
916    @unittest.skipIf(c_int is None, "requires _ctypes")
917    def test_array_from_size(self):
918        size = 10
919        # Test for zeroing (see issue #11675).
920        # The repetition below strengthens the test by increasing the chances
921        # of previously allocated non-zero memory being used for the new array
922        # on the 2nd and 3rd loops.
923        for _ in range(3):
924            arr = self.Array('i', size)
925            self.assertEqual(len(arr), size)
926            self.assertEqual(list(arr), [0] * size)
927            arr[:] = range(10)
928            self.assertEqual(list(arr), range(10))
929            del arr
930
931    @unittest.skipIf(c_int is None, "requires _ctypes")
932    def test_rawarray(self):
933        self.test_array(raw=True)
934
935    @unittest.skipIf(c_int is None, "requires _ctypes")
936    def test_array_accepts_long(self):
937        arr = self.Array('i', 10L)
938        self.assertEqual(len(arr), 10)
939        raw_arr = self.RawArray('i', 10L)
940        self.assertEqual(len(raw_arr), 10)
941
942    @unittest.skipIf(c_int is None, "requires _ctypes")
943    def test_getobj_getlock_obj(self):
944        arr1 = self.Array('i', range(10))
945        lock1 = arr1.get_lock()
946        obj1 = arr1.get_obj()
947
948        arr2 = self.Array('i', range(10), lock=None)
949        lock2 = arr2.get_lock()
950        obj2 = arr2.get_obj()
951
952        lock = self.Lock()
953        arr3 = self.Array('i', range(10), lock=lock)
954        lock3 = arr3.get_lock()
955        obj3 = arr3.get_obj()
956        self.assertEqual(lock, lock3)
957
958        arr4 = self.Array('i', range(10), lock=False)
959        self.assertFalse(hasattr(arr4, 'get_lock'))
960        self.assertFalse(hasattr(arr4, 'get_obj'))
961        self.assertRaises(AttributeError,
962                          self.Array, 'i', range(10), lock='notalock')
963
964        arr5 = self.RawArray('i', range(10))
965        self.assertFalse(hasattr(arr5, 'get_lock'))
966        self.assertFalse(hasattr(arr5, 'get_obj'))
967
968#
969#
970#
971
972class _TestContainers(BaseTestCase):
973
974    ALLOWED_TYPES = ('manager',)
975
976    def test_list(self):
977        a = self.list(range(10))
978        self.assertEqual(a[:], range(10))
979
980        b = self.list()
981        self.assertEqual(b[:], [])
982
983        b.extend(range(5))
984        self.assertEqual(b[:], range(5))
985
986        self.assertEqual(b[2], 2)
987        self.assertEqual(b[2:10], [2,3,4])
988
989        b *= 2
990        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
991
992        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
993
994        self.assertEqual(a[:], range(10))
995
996        d = [a, b]
997        e = self.list(d)
998        self.assertEqual(
999            e[:],
1000            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1001            )
1002
1003        f = self.list([a])
1004        a.append('hello')
1005        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1006
1007    def test_dict(self):
1008        d = self.dict()
1009        indices = range(65, 70)
1010        for i in indices:
1011            d[i] = chr(i)
1012        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1013        self.assertEqual(sorted(d.keys()), indices)
1014        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1015        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1016
1017    def test_namespace(self):
1018        n = self.Namespace()
1019        n.name = 'Bob'
1020        n.job = 'Builder'
1021        n._hidden = 'hidden'
1022        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1023        del n.job
1024        self.assertEqual(str(n), "Namespace(name='Bob')")
1025        self.assertTrue(hasattr(n, 'name'))
1026        self.assertTrue(not hasattr(n, 'job'))
1027
1028#
1029#
1030#
1031
1032def sqr(x, wait=0.0):
1033    time.sleep(wait)
1034    return x*x
1035class _TestPool(BaseTestCase):
1036
1037    def test_apply(self):
1038        papply = self.pool.apply
1039        self.assertEqual(papply(sqr, (5,)), sqr(5))
1040        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1041
1042    def test_map(self):
1043        pmap = self.pool.map
1044        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1045        self.assertEqual(pmap(sqr, range(100), chunksize=20),
1046                         map(sqr, range(100)))
1047
1048    def test_map_chunksize(self):
1049        try:
1050            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1051        except multiprocessing.TimeoutError:
1052            self.fail("pool.map_async with chunksize stalled on null list")
1053
1054    def test_async(self):
1055        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1056        get = TimingWrapper(res.get)
1057        self.assertEqual(get(), 49)
1058        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1059
1060    def test_async_timeout(self):
1061        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1062        get = TimingWrapper(res.get)
1063        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1064        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1065
1066    def test_imap(self):
1067        it = self.pool.imap(sqr, range(10))
1068        self.assertEqual(list(it), map(sqr, range(10)))
1069
1070        it = self.pool.imap(sqr, range(10))
1071        for i in range(10):
1072            self.assertEqual(it.next(), i*i)
1073        self.assertRaises(StopIteration, it.next)
1074
1075        it = self.pool.imap(sqr, range(1000), chunksize=100)
1076        for i in range(1000):
1077            self.assertEqual(it.next(), i*i)
1078        self.assertRaises(StopIteration, it.next)
1079
1080    def test_imap_unordered(self):
1081        it = self.pool.imap_unordered(sqr, range(1000))
1082        self.assertEqual(sorted(it), map(sqr, range(1000)))
1083
1084        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1085        self.assertEqual(sorted(it), map(sqr, range(1000)))
1086
1087    def test_make_pool(self):
1088        p = multiprocessing.Pool(3)
1089        self.assertEqual(3, len(p._pool))
1090        p.close()
1091        p.join()
1092
1093    def test_terminate(self):
1094        if self.TYPE == 'manager':
1095            # On Unix a forked process increfs each shared object to
1096            # which its parent process held a reference.  If the
1097            # forked process gets terminated then there is likely to
1098            # be a reference leak.  So to prevent
1099            # _TestZZZNumberOfObjects from failing we skip this test
1100            # when using a manager.
1101            return
1102
1103        result = self.pool.map_async(
1104            time.sleep, [0.1 for i in range(10000)], chunksize=1
1105            )
1106        self.pool.terminate()
1107        join = TimingWrapper(self.pool.join)
1108        join()
1109        self.assertTrue(join.elapsed < 0.2)
1110
1111class _TestPoolWorkerLifetime(BaseTestCase):
1112
1113    ALLOWED_TYPES = ('processes', )
1114    def test_pool_worker_lifetime(self):
1115        p = multiprocessing.Pool(3, maxtasksperchild=10)
1116        self.assertEqual(3, len(p._pool))
1117        origworkerpids = [w.pid for w in p._pool]
1118        # Run many tasks so each worker gets replaced (hopefully)
1119        results = []
1120        for i in range(100):
1121            results.append(p.apply_async(sqr, (i, )))
1122        # Fetch the results and verify we got the right answers,
1123        # also ensuring all the tasks have completed.
1124        for (j, res) in enumerate(results):
1125            self.assertEqual(res.get(), sqr(j))
1126        # Refill the pool
1127        p._repopulate_pool()
1128        # Wait until all workers are alive
1129        # (countdown * DELTA = 5 seconds max startup process time)
1130        countdown = 50
1131        while countdown and not all(w.is_alive() for w in p._pool):
1132            countdown -= 1
1133            time.sleep(DELTA)
1134        finalworkerpids = [w.pid for w in p._pool]
1135        # All pids should be assigned.  See issue #7805.
1136        self.assertNotIn(None, origworkerpids)
1137        self.assertNotIn(None, finalworkerpids)
1138        # Finally, check that the worker pids have changed
1139        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1140        p.close()
1141        p.join()
1142
1143#
1144# Test that manager has expected number of shared objects left
1145#
1146
1147class _TestZZZNumberOfObjects(BaseTestCase):
1148    # Because test cases are sorted alphabetically, this one will get
1149    # run after all the other tests for the manager.  It tests that
1150    # there have been no "reference leaks" for the manager's shared
1151    # objects.  Note the comment in _TestPool.test_terminate().
1152    ALLOWED_TYPES = ('manager',)
1153
1154    def test_number_of_objects(self):
1155        EXPECTED_NUMBER = 1                # the pool object is still alive
1156        multiprocessing.active_children()  # discard dead process objs
1157        gc.collect()                       # do garbage collection
1158        refs = self.manager._number_of_objects()
1159        debug_info = self.manager._debug_info()
1160        if refs != EXPECTED_NUMBER:
1161            print self.manager._debug_info()
1162            print debug_info
1163
1164        self.assertEqual(refs, EXPECTED_NUMBER)
1165
1166#
1167# Test of creating a customized manager class
1168#
1169
1170from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1171
1172class FooBar(object):
1173    def f(self):
1174        return 'f()'
1175    def g(self):
1176        raise ValueError
1177    def _h(self):
1178        return '_h()'
1179
1180def baz():
1181    for i in xrange(10):
1182        yield i*i
1183
1184class IteratorProxy(BaseProxy):
1185    _exposed_ = ('next', '__next__')
1186    def __iter__(self):
1187        return self
1188    def next(self):
1189        return self._callmethod('next')
1190    def __next__(self):
1191        return self._callmethod('__next__')
1192
1193class MyManager(BaseManager):
1194    pass
1195
1196MyManager.register('Foo', callable=FooBar)
1197MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1198MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1199
1200
1201class _TestMyManager(BaseTestCase):
1202
1203    ALLOWED_TYPES = ('manager',)
1204
1205    def test_mymanager(self):
1206        manager = MyManager()
1207        manager.start()
1208
1209        foo = manager.Foo()
1210        bar = manager.Bar()
1211        baz = manager.baz()
1212
1213        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1214        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1215
1216        self.assertEqual(foo_methods, ['f', 'g'])
1217        self.assertEqual(bar_methods, ['f', '_h'])
1218
1219        self.assertEqual(foo.f(), 'f()')
1220        self.assertRaises(ValueError, foo.g)
1221        self.assertEqual(foo._callmethod('f'), 'f()')
1222        self.assertRaises(RemoteError, foo._callmethod, '_h')
1223
1224        self.assertEqual(bar.f(), 'f()')
1225        self.assertEqual(bar._h(), '_h()')
1226        self.assertEqual(bar._callmethod('f'), 'f()')
1227        self.assertEqual(bar._callmethod('_h'), '_h()')
1228
1229        self.assertEqual(list(baz), [i*i for i in range(10)])
1230
1231        manager.shutdown()
1232
1233#
1234# Test of connecting to a remote server and using xmlrpclib for serialization
1235#
1236
1237_queue = Queue.Queue()
1238def get_queue():
1239    return _queue
1240
1241class QueueManager(BaseManager):
1242    '''manager class used by server process'''
1243QueueManager.register('get_queue', callable=get_queue)
1244
1245class QueueManager2(BaseManager):
1246    '''manager class which specifies the same interface as QueueManager'''
1247QueueManager2.register('get_queue')
1248
1249
1250SERIALIZER = 'xmlrpclib'
1251
1252class _TestRemoteManager(BaseTestCase):
1253
1254    ALLOWED_TYPES = ('manager',)
1255
1256    @classmethod
1257    def _putter(cls, address, authkey):
1258        manager = QueueManager2(
1259            address=address, authkey=authkey, serializer=SERIALIZER
1260            )
1261        manager.connect()
1262        queue = manager.get_queue()
1263        queue.put(('hello world', None, True, 2.25))
1264
1265    def test_remote(self):
1266        authkey = os.urandom(32)
1267
1268        manager = QueueManager(
1269            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1270            )
1271        manager.start()
1272
1273        p = self.Process(target=self._putter, args=(manager.address, authkey))
1274        p.start()
1275
1276        manager2 = QueueManager2(
1277            address=manager.address, authkey=authkey, serializer=SERIALIZER
1278            )
1279        manager2.connect()
1280        queue = manager2.get_queue()
1281
1282        # Note that xmlrpclib will deserialize object as a list not a tuple
1283        self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1284
1285        # Because we are using xmlrpclib for serialization instead of
1286        # pickle this will cause a serialization error.
1287        self.assertRaises(Exception, queue.put, time.sleep)
1288
1289        # Make queue finalizer run before the server is stopped
1290        del queue
1291        manager.shutdown()
1292
1293class _TestManagerRestart(BaseTestCase):
1294
1295    @classmethod
1296    def _putter(cls, address, authkey):
1297        manager = QueueManager(
1298            address=address, authkey=authkey, serializer=SERIALIZER)
1299        manager.connect()
1300        queue = manager.get_queue()
1301        queue.put('hello world')
1302
1303    def test_rapid_restart(self):
1304        authkey = os.urandom(32)
1305        manager = QueueManager(
1306            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1307        srvr = manager.get_server()
1308        addr = srvr.address
1309        # Close the connection.Listener socket which gets opened as a part
1310        # of manager.get_server(). It's not needed for the test.
1311        srvr.listener.close()
1312        manager.start()
1313
1314        p = self.Process(target=self._putter, args=(manager.address, authkey))
1315        p.start()
1316        queue = manager.get_queue()
1317        self.assertEqual(queue.get(), 'hello world')
1318        del queue
1319        manager.shutdown()
1320        manager = QueueManager(
1321            address=addr, authkey=authkey, serializer=SERIALIZER)
1322        manager.start()
1323        manager.shutdown()
1324
1325#
1326#
1327#
1328
1329SENTINEL = latin('')
1330
1331class _TestConnection(BaseTestCase):
1332
1333    ALLOWED_TYPES = ('processes', 'threads')
1334
1335    @classmethod
1336    def _echo(cls, conn):
1337        for msg in iter(conn.recv_bytes, SENTINEL):
1338            conn.send_bytes(msg)
1339        conn.close()
1340
1341    def test_connection(self):
1342        conn, child_conn = self.Pipe()
1343
1344        p = self.Process(target=self._echo, args=(child_conn,))
1345        p.daemon = True
1346        p.start()
1347
1348        seq = [1, 2.25, None]
1349        msg = latin('hello world')
1350        longmsg = msg * 10
1351        arr = array.array('i', range(4))
1352
1353        if self.TYPE == 'processes':
1354            self.assertEqual(type(conn.fileno()), int)
1355
1356        self.assertEqual(conn.send(seq), None)
1357        self.assertEqual(conn.recv(), seq)
1358
1359        self.assertEqual(conn.send_bytes(msg), None)
1360        self.assertEqual(conn.recv_bytes(), msg)
1361
1362        if self.TYPE == 'processes':
1363            buffer = array.array('i', [0]*10)
1364            expected = list(arr) + [0] * (10 - len(arr))
1365            self.assertEqual(conn.send_bytes(arr), None)
1366            self.assertEqual(conn.recv_bytes_into(buffer),
1367                             len(arr) * buffer.itemsize)
1368            self.assertEqual(list(buffer), expected)
1369
1370            buffer = array.array('i', [0]*10)
1371            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1372            self.assertEqual(conn.send_bytes(arr), None)
1373            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1374                             len(arr) * buffer.itemsize)
1375            self.assertEqual(list(buffer), expected)
1376
1377            buffer = bytearray(latin(' ' * 40))
1378            self.assertEqual(conn.send_bytes(longmsg), None)
1379            try:
1380                res = conn.recv_bytes_into(buffer)
1381            except multiprocessing.BufferTooShort, e:
1382                self.assertEqual(e.args, (longmsg,))
1383            else:
1384                self.fail('expected BufferTooShort, got %s' % res)
1385
1386        poll = TimingWrapper(conn.poll)
1387
1388        self.assertEqual(poll(), False)
1389        self.assertTimingAlmostEqual(poll.elapsed, 0)
1390
1391        self.assertEqual(poll(TIMEOUT1), False)
1392        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1393
1394        conn.send(None)
1395
1396        self.assertEqual(poll(TIMEOUT1), True)
1397        self.assertTimingAlmostEqual(poll.elapsed, 0)
1398
1399        self.assertEqual(conn.recv(), None)
1400
1401        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
1402        conn.send_bytes(really_big_msg)
1403        self.assertEqual(conn.recv_bytes(), really_big_msg)
1404
1405        conn.send_bytes(SENTINEL)                          # tell child to quit
1406        child_conn.close()
1407
1408        if self.TYPE == 'processes':
1409            self.assertEqual(conn.readable, True)
1410            self.assertEqual(conn.writable, True)
1411            self.assertRaises(EOFError, conn.recv)
1412            self.assertRaises(EOFError, conn.recv_bytes)
1413
1414        p.join()
1415
1416    def test_duplex_false(self):
1417        reader, writer = self.Pipe(duplex=False)
1418        self.assertEqual(writer.send(1), None)
1419        self.assertEqual(reader.recv(), 1)
1420        if self.TYPE == 'processes':
1421            self.assertEqual(reader.readable, True)
1422            self.assertEqual(reader.writable, False)
1423            self.assertEqual(writer.readable, False)
1424            self.assertEqual(writer.writable, True)
1425            self.assertRaises(IOError, reader.send, 2)
1426            self.assertRaises(IOError, writer.recv)
1427            self.assertRaises(IOError, writer.poll)
1428
1429    def test_spawn_close(self):
1430        # We test that a pipe connection can be closed by parent
1431        # process immediately after child is spawned.  On Windows this
1432        # would have sometimes failed on old versions because
1433        # child_conn would be closed before the child got a chance to
1434        # duplicate it.
1435        conn, child_conn = self.Pipe()
1436
1437        p = self.Process(target=self._echo, args=(child_conn,))
1438        p.start()
1439        child_conn.close()    # this might complete before child initializes
1440
1441        msg = latin('hello')
1442        conn.send_bytes(msg)
1443        self.assertEqual(conn.recv_bytes(), msg)
1444
1445        conn.send_bytes(SENTINEL)
1446        conn.close()
1447        p.join()
1448
1449    def test_sendbytes(self):
1450        if self.TYPE != 'processes':
1451            return
1452
1453        msg = latin('abcdefghijklmnopqrstuvwxyz')
1454        a, b = self.Pipe()
1455
1456        a.send_bytes(msg)
1457        self.assertEqual(b.recv_bytes(), msg)
1458
1459        a.send_bytes(msg, 5)
1460        self.assertEqual(b.recv_bytes(), msg[5:])
1461
1462        a.send_bytes(msg, 7, 8)
1463        self.assertEqual(b.recv_bytes(), msg[7:7+8])
1464
1465        a.send_bytes(msg, 26)
1466        self.assertEqual(b.recv_bytes(), latin(''))
1467
1468        a.send_bytes(msg, 26, 0)
1469        self.assertEqual(b.recv_bytes(), latin(''))
1470
1471        self.assertRaises(ValueError, a.send_bytes, msg, 27)
1472
1473        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1474
1475        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1476
1477        self.assertRaises(ValueError, a.send_bytes, msg, -1)
1478
1479        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1480
1481class _TestListenerClient(BaseTestCase):
1482
1483    ALLOWED_TYPES = ('processes', 'threads')
1484
1485    @classmethod
1486    def _test(cls, address):
1487        conn = cls.connection.Client(address)
1488        conn.send('hello')
1489        conn.close()
1490
1491    def test_listener_client(self):
1492        for family in self.connection.families:
1493            l = self.connection.Listener(family=family)
1494            p = self.Process(target=self._test, args=(l.address,))
1495            p.daemon = True
1496            p.start()
1497            conn = l.accept()
1498            self.assertEqual(conn.recv(), 'hello')
1499            p.join()
1500            l.close()
1501#
1502# Test of sending connection and socket objects between processes
1503#
1504"""
1505class _TestPicklingConnections(BaseTestCase):
1506
1507    ALLOWED_TYPES = ('processes',)
1508
1509    def _listener(self, conn, families):
1510        for fam in families:
1511            l = self.connection.Listener(family=fam)
1512            conn.send(l.address)
1513            new_conn = l.accept()
1514            conn.send(new_conn)
1515
1516        if self.TYPE == 'processes':
1517            l = socket.socket()
1518            l.bind(('localhost', 0))
1519            conn.send(l.getsockname())
1520            l.listen(1)
1521            new_conn, addr = l.accept()
1522            conn.send(new_conn)
1523
1524        conn.recv()
1525
1526    def _remote(self, conn):
1527        for (address, msg) in iter(conn.recv, None):
1528            client = self.connection.Client(address)
1529            client.send(msg.upper())
1530            client.close()
1531
1532        if self.TYPE == 'processes':
1533            address, msg = conn.recv()
1534            client = socket.socket()
1535            client.connect(address)
1536            client.sendall(msg.upper())
1537            client.close()
1538
1539        conn.close()
1540
1541    def test_pickling(self):
1542        try:
1543            multiprocessing.allow_connection_pickling()
1544        except ImportError:
1545            return
1546
1547        families = self.connection.families
1548
1549        lconn, lconn0 = self.Pipe()
1550        lp = self.Process(target=self._listener, args=(lconn0, families))
1551        lp.start()
1552        lconn0.close()
1553
1554        rconn, rconn0 = self.Pipe()
1555        rp = self.Process(target=self._remote, args=(rconn0,))
1556        rp.start()
1557        rconn0.close()
1558
1559        for fam in families:
1560            msg = ('This connection uses family %s' % fam).encode('ascii')
1561            address = lconn.recv()
1562            rconn.send((address, msg))
1563            new_conn = lconn.recv()
1564            self.assertEqual(new_conn.recv(), msg.upper())
1565
1566        rconn.send(None)
1567
1568        if self.TYPE == 'processes':
1569            msg = latin('This connection uses a normal socket')
1570            address = lconn.recv()
1571            rconn.send((address, msg))
1572            if hasattr(socket, 'fromfd'):
1573                new_conn = lconn.recv()
1574                self.assertEqual(new_conn.recv(100), msg.upper())
1575            else:
1576                # XXX On Windows with Py2.6 need to backport fromfd()
1577                discard = lconn.recv_bytes()
1578
1579        lconn.send(None)
1580
1581        rconn.close()
1582        lconn.close()
1583
1584        lp.join()
1585        rp.join()
1586"""
1587#
1588#
1589#
1590
1591class _TestHeap(BaseTestCase):
1592
1593    ALLOWED_TYPES = ('processes',)
1594
1595    def test_heap(self):
1596        iterations = 5000
1597        maxblocks = 50
1598        blocks = []
1599
1600        # create and destroy lots of blocks of different sizes
1601        for i in xrange(iterations):
1602            size = int(random.lognormvariate(0, 1) * 1000)
1603            b = multiprocessing.heap.BufferWrapper(size)
1604            blocks.append(b)
1605            if len(blocks) > maxblocks:
1606                i = random.randrange(maxblocks)
1607                del blocks[i]
1608
1609        # get the heap object
1610        heap = multiprocessing.heap.BufferWrapper._heap
1611
1612        # verify the state of the heap
1613        all = []
1614        occupied = 0
1615        for L in heap._len_to_seq.values():
1616            for arena, start, stop in L:
1617                all.append((heap._arenas.index(arena), start, stop,
1618                            stop-start, 'free'))
1619        for arena, start, stop in heap._allocated_blocks:
1620            all.append((heap._arenas.index(arena), start, stop,
1621                        stop-start, 'occupied'))
1622            occupied += (stop-start)
1623
1624        all.sort()
1625
1626        for i in range(len(all)-1):
1627            (arena, start, stop) = all[i][:3]
1628            (narena, nstart, nstop) = all[i+1][:3]
1629            self.assertTrue((arena != narena and nstart == 0) or
1630                            (stop == nstart))
1631
1632#
1633#
1634#
1635
1636class _Foo(Structure):
1637    _fields_ = [
1638        ('x', c_int),
1639        ('y', c_double)
1640        ]
1641
1642class _TestSharedCTypes(BaseTestCase):
1643
1644    ALLOWED_TYPES = ('processes',)
1645
1646    def setUp(self):
1647        if not HAS_SHAREDCTYPES:
1648            self.skipTest("requires multiprocessing.sharedctypes")
1649
1650    @classmethod
1651    def _double(cls, x, y, foo, arr, string):
1652        x.value *= 2
1653        y.value *= 2
1654        foo.x *= 2
1655        foo.y *= 2
1656        string.value *= 2
1657        for i in range(len(arr)):
1658            arr[i] *= 2
1659
1660    def test_sharedctypes(self, lock=False):
1661        x = Value('i', 7, lock=lock)
1662        y = Value(c_double, 1.0/3.0, lock=lock)
1663        foo = Value(_Foo, 3, 2, lock=lock)
1664        arr = self.Array('d', range(10), lock=lock)
1665        string = self.Array('c', 20, lock=lock)
1666        string.value = latin('hello')
1667
1668        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1669        p.start()
1670        p.join()
1671
1672        self.assertEqual(x.value, 14)
1673        self.assertAlmostEqual(y.value, 2.0/3.0)
1674        self.assertEqual(foo.x, 6)
1675        self.assertAlmostEqual(foo.y, 4.0)
1676        for i in range(10):
1677            self.assertAlmostEqual(arr[i], i*2)
1678        self.assertEqual(string.value, latin('hellohello'))
1679
1680    def test_synchronize(self):
1681        self.test_sharedctypes(lock=True)
1682
1683    def test_copy(self):
1684        foo = _Foo(2, 5.0)
1685        bar = copy(foo)
1686        foo.x = 0
1687        foo.y = 0
1688        self.assertEqual(bar.x, 2)
1689        self.assertAlmostEqual(bar.y, 5.0)
1690
1691#
1692#
1693#
1694
1695class _TestFinalize(BaseTestCase):
1696
1697    ALLOWED_TYPES = ('processes',)
1698
1699    @classmethod
1700    def _test_finalize(cls, conn):
1701        class Foo(object):
1702            pass
1703
1704        a = Foo()
1705        util.Finalize(a, conn.send, args=('a',))
1706        del a           # triggers callback for a
1707
1708        b = Foo()
1709        close_b = util.Finalize(b, conn.send, args=('b',))
1710        close_b()       # triggers callback for b
1711        close_b()       # does nothing because callback has already been called
1712        del b           # does nothing because callback has already been called
1713
1714        c = Foo()
1715        util.Finalize(c, conn.send, args=('c',))
1716
1717        d10 = Foo()
1718        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1719
1720        d01 = Foo()
1721        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1722        d02 = Foo()
1723        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1724        d03 = Foo()
1725        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1726
1727        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1728
1729        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1730
1731        # call multiprocessing's cleanup function then exit process without
1732        # garbage collecting locals
1733        util._exit_function()
1734        conn.close()
1735        os._exit(0)
1736
1737    def test_finalize(self):
1738        conn, child_conn = self.Pipe()
1739
1740        p = self.Process(target=self._test_finalize, args=(child_conn,))
1741        p.start()
1742        p.join()
1743
1744        result = [obj for obj in iter(conn.recv, 'STOP')]
1745        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1746
1747#
1748# Test that from ... import * works for each module
1749#
1750
1751class _TestImportStar(BaseTestCase):
1752
1753    ALLOWED_TYPES = ('processes',)
1754
1755    def test_import(self):
1756        modules = [
1757            'multiprocessing', 'multiprocessing.connection',
1758            'multiprocessing.heap', 'multiprocessing.managers',
1759            'multiprocessing.pool', 'multiprocessing.process',
1760            'multiprocessing.reduction',
1761            'multiprocessing.synchronize', 'multiprocessing.util'
1762            ]
1763
1764        if c_int is not None:
1765            # This module requires _ctypes
1766            modules.append('multiprocessing.sharedctypes')
1767
1768        for name in modules:
1769            __import__(name)
1770            mod = sys.modules[name]
1771
1772            for attr in getattr(mod, '__all__', ()):
1773                self.assertTrue(
1774                    hasattr(mod, attr),
1775                    '%r does not have attribute %r' % (mod, attr)
1776                    )
1777
1778#
1779# Quick test that logging works -- does not test logging output
1780#
1781
1782class _TestLogging(BaseTestCase):
1783
1784    ALLOWED_TYPES = ('processes',)
1785
1786    def test_enable_logging(self):
1787        logger = multiprocessing.get_logger()
1788        logger.setLevel(util.SUBWARNING)
1789        self.assertTrue(logger is not None)
1790        logger.debug('this will not be printed')
1791        logger.info('nor will this')
1792        logger.setLevel(LOG_LEVEL)
1793
1794    @classmethod
1795    def _test_level(cls, conn):
1796        logger = multiprocessing.get_logger()
1797        conn.send(logger.getEffectiveLevel())
1798
1799    def test_level(self):
1800        LEVEL1 = 32
1801        LEVEL2 = 37
1802
1803        logger = multiprocessing.get_logger()
1804        root_logger = logging.getLogger()
1805        root_level = root_logger.level
1806
1807        reader, writer = multiprocessing.Pipe(duplex=False)
1808
1809        logger.setLevel(LEVEL1)
1810        self.Process(target=self._test_level, args=(writer,)).start()
1811        self.assertEqual(LEVEL1, reader.recv())
1812
1813        logger.setLevel(logging.NOTSET)
1814        root_logger.setLevel(LEVEL2)
1815        self.Process(target=self._test_level, args=(writer,)).start()
1816        self.assertEqual(LEVEL2, reader.recv())
1817
1818        root_logger.setLevel(root_level)
1819        logger.setLevel(level=LOG_LEVEL)
1820
1821
1822# class _TestLoggingProcessName(BaseTestCase):
1823#
1824#     def handle(self, record):
1825#         assert record.processName == multiprocessing.current_process().name
1826#         self.__handled = True
1827#
1828#     def test_logging(self):
1829#         handler = logging.Handler()
1830#         handler.handle = self.handle
1831#         self.__handled = False
1832#         # Bypass getLogger() and side-effects
1833#         logger = logging.getLoggerClass()(
1834#                 'multiprocessing.test.TestLoggingProcessName')
1835#         logger.addHandler(handler)
1836#         logger.propagate = False
1837#
1838#         logger.warn('foo')
1839#         assert self.__handled
1840
1841#
1842# Test to verify handle verification, see issue 3321
1843#
1844
1845class TestInvalidHandle(unittest.TestCase):
1846
1847    @unittest.skipIf(WIN32, "skipped on Windows")
1848    def test_invalid_handles(self):
1849        conn = _multiprocessing.Connection(44977608)
1850        self.assertRaises(IOError, conn.poll)
1851        self.assertRaises(IOError, _multiprocessing.Connection, -1)
1852
1853#
1854# Functions used to create test cases from the base ones in this module
1855#
1856
1857def get_attributes(Source, names):
1858    d = {}
1859    for name in names:
1860        obj = getattr(Source, name)
1861        if type(obj) == type(get_attributes):
1862            obj = staticmethod(obj)
1863        d[name] = obj
1864    return d
1865
1866def create_test_cases(Mixin, type):
1867    result = {}
1868    glob = globals()
1869    Type = type.capitalize()
1870
1871    for name in glob.keys():
1872        if name.startswith('_Test'):
1873            base = glob[name]
1874            if type in base.ALLOWED_TYPES:
1875                newname = 'With' + Type + name[1:]
1876                class Temp(base, unittest.TestCase, Mixin):
1877                    pass
1878                result[newname] = Temp
1879                Temp.__name__ = newname
1880                Temp.__module__ = Mixin.__module__
1881    return result
1882
1883#
1884# Create test cases
1885#
1886
1887class ProcessesMixin(object):
1888    TYPE = 'processes'
1889    Process = multiprocessing.Process
1890    locals().update(get_attributes(multiprocessing, (
1891        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1892        'Condition', 'Event', 'Value', 'Array', 'RawValue',
1893        'RawArray', 'current_process', 'active_children', 'Pipe',
1894        'connection', 'JoinableQueue'
1895        )))
1896
1897testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1898globals().update(testcases_processes)
1899
1900
1901class ManagerMixin(object):
1902    TYPE = 'manager'
1903    Process = multiprocessing.Process
1904    manager = object.__new__(multiprocessing.managers.SyncManager)
1905    locals().update(get_attributes(manager, (
1906        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1907       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1908        'Namespace', 'JoinableQueue'
1909        )))
1910
1911testcases_manager = create_test_cases(ManagerMixin, type='manager')
1912globals().update(testcases_manager)
1913
1914
1915class ThreadsMixin(object):
1916    TYPE = 'threads'
1917    Process = multiprocessing.dummy.Process
1918    locals().update(get_attributes(multiprocessing.dummy, (
1919        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1920        'Condition', 'Event', 'Value', 'Array', 'current_process',
1921        'active_children', 'Pipe', 'connection', 'dict', 'list',
1922        'Namespace', 'JoinableQueue'
1923        )))
1924
1925testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1926globals().update(testcases_threads)
1927
1928class OtherTest(unittest.TestCase):
1929    # TODO: add more tests for deliver/answer challenge.
1930    def test_deliver_challenge_auth_failure(self):
1931        class _FakeConnection(object):
1932            def recv_bytes(self, size):
1933                return b'something bogus'
1934            def send_bytes(self, data):
1935                pass
1936        self.assertRaises(multiprocessing.AuthenticationError,
1937                          multiprocessing.connection.deliver_challenge,
1938                          _FakeConnection(), b'abc')
1939
1940    def test_answer_challenge_auth_failure(self):
1941        class _FakeConnection(object):
1942            def __init__(self):
1943                self.count = 0
1944            def recv_bytes(self, size):
1945                self.count += 1
1946                if self.count == 1:
1947                    return multiprocessing.connection.CHALLENGE
1948                elif self.count == 2:
1949                    return b'something bogus'
1950                return b''
1951            def send_bytes(self, data):
1952                pass
1953        self.assertRaises(multiprocessing.AuthenticationError,
1954                          multiprocessing.connection.answer_challenge,
1955                          _FakeConnection(), b'abc')
1956
1957#
1958# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1959#
1960
1961def initializer(ns):
1962    ns.test += 1
1963
1964class TestInitializers(unittest.TestCase):
1965    def setUp(self):
1966        self.mgr = multiprocessing.Manager()
1967        self.ns = self.mgr.Namespace()
1968        self.ns.test = 0
1969
1970    def tearDown(self):
1971        self.mgr.shutdown()
1972
1973    def test_manager_initializer(self):
1974        m = multiprocessing.managers.SyncManager()
1975        self.assertRaises(TypeError, m.start, 1)
1976        m.start(initializer, (self.ns,))
1977        self.assertEqual(self.ns.test, 1)
1978        m.shutdown()
1979
1980    def test_pool_initializer(self):
1981        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1982        p = multiprocessing.Pool(1, initializer, (self.ns,))
1983        p.close()
1984        p.join()
1985        self.assertEqual(self.ns.test, 1)
1986
1987#
1988# Issue 5155, 5313, 5331: Test process in processes
1989# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1990#
1991
1992def _ThisSubProcess(q):
1993    try:
1994        item = q.get(block=False)
1995    except Queue.Empty:
1996        pass
1997
1998def _TestProcess(q):
1999    queue = multiprocessing.Queue()
2000    subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2001    subProc.start()
2002    subProc.join()
2003
2004def _afunc(x):
2005    return x*x
2006
2007def pool_in_process():
2008    pool = multiprocessing.Pool(processes=4)
2009    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2010
2011class _file_like(object):
2012    def __init__(self, delegate):
2013        self._delegate = delegate
2014        self._pid = None
2015
2016    @property
2017    def cache(self):
2018        pid = os.getpid()
2019        # There are no race conditions since fork keeps only the running thread
2020        if pid != self._pid:
2021            self._pid = pid
2022            self._cache = []
2023        return self._cache
2024
2025    def write(self, data):
2026        self.cache.append(data)
2027
2028    def flush(self):
2029        self._delegate.write(''.join(self.cache))
2030        self._cache = []
2031
2032class TestStdinBadfiledescriptor(unittest.TestCase):
2033
2034    def test_queue_in_process(self):
2035        queue = multiprocessing.Queue()
2036        proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2037        proc.start()
2038        proc.join()
2039
2040    def test_pool_in_process(self):
2041        p = multiprocessing.Process(target=pool_in_process)
2042        p.start()
2043        p.join()
2044
2045    def test_flushing(self):
2046        sio = StringIO()
2047        flike = _file_like(sio)
2048        flike.write('foo')
2049        proc = multiprocessing.Process(target=lambda: flike.flush())
2050        flike.flush()
2051        assert sio.getvalue() == 'foo'
2052
2053testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2054                   TestStdinBadfiledescriptor]
2055
2056#
2057#
2058#
2059
2060def test_main(run=None):
2061    if sys.platform.startswith("linux"):
2062        try:
2063            lock = multiprocessing.RLock()
2064        except OSError:
2065            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
2066
2067    if run is None:
2068        from test.test_support import run_unittest as run
2069
2070    util.get_temp_dir()     # creates temp directory for use by all processes
2071
2072    multiprocessing.get_logger().setLevel(LOG_LEVEL)
2073
2074    ProcessesMixin.pool = multiprocessing.Pool(4)
2075    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2076    ManagerMixin.manager.__init__()
2077    ManagerMixin.manager.start()
2078    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
2079
2080    testcases = (
2081        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2082        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
2083        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2084        testcases_other
2085        )
2086
2087    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2088    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2089    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2090    # module during these tests is at least platform dependent and possibly
2091    # non-deterministic on any given platform. So we don't mind if the listed
2092    # warnings aren't actually raised.
2093    with test_support.check_py3k_warnings(
2094            (".+__(get|set)slice__ has been removed", DeprecationWarning),
2095            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2096            quiet=True):
2097        run(suite)
2098
2099    ThreadsMixin.pool.terminate()
2100    ProcessesMixin.pool.terminate()
2101    ManagerMixin.pool.terminate()
2102    ManagerMixin.manager.shutdown()
2103
2104    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
2105
2106def main():
2107    test_main(unittest.TextTestRunner(verbosity=2).run)
2108
2109if __name__ == '__main__':
2110    main()
2111