• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import queue as pyqueue
7import contextlib
8import time
9import io
10import itertools
11import sys
12import os
13import gc
14import errno
15import signal
16import array
17import socket
18import random
19import logging
20import struct
21import operator
22import weakref
23import test.support
24import test.support.script_helper
25from test import support
26
27
28# Skip tests if _multiprocessing wasn't built.
29_multiprocessing = test.support.import_module('_multiprocessing')
30# Skip tests if sem_open implementation is broken.
31test.support.import_module('multiprocessing.synchronize')
32# import threading after _multiprocessing to raise a more relevant error
33# message: "No module named _multiprocessing". _multiprocessing is not compiled
34# without thread support.
35import threading
36
37import multiprocessing.connection
38import multiprocessing.dummy
39import multiprocessing.heap
40import multiprocessing.managers
41import multiprocessing.pool
42import multiprocessing.queues
43
44from multiprocessing import util
45
46try:
47    from multiprocessing import reduction
48    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
49except ImportError:
50    HAS_REDUCTION = False
51
52try:
53    from multiprocessing.sharedctypes import Value, copy
54    HAS_SHAREDCTYPES = True
55except ImportError:
56    HAS_SHAREDCTYPES = False
57
58try:
59    import msvcrt
60except ImportError:
61    msvcrt = None
62
63#
64#
65#
66
67# Timeout to wait until a process completes
68TIMEOUT = 30.0 # seconds
69
70def latin(s):
71    return s.encode('latin')
72
73
74def close_queue(queue):
75    if isinstance(queue, multiprocessing.queues.Queue):
76        queue.close()
77        queue.join_thread()
78
79
80def join_process(process):
81    # Since multiprocessing.Process has the same API than threading.Thread
82    # (join() and is_alive(), the support function can be reused
83    support.join_thread(process, timeout=TIMEOUT)
84
85
86#
87# Constants
88#
89
90LOG_LEVEL = util.SUBWARNING
91#LOG_LEVEL = logging.DEBUG
92
93DELTA = 0.1
94CHECK_TIMINGS = False     # making true makes tests take a lot longer
95                          # and can sometimes cause some non-serious
96                          # failures because some calls block a bit
97                          # longer than expected
98if CHECK_TIMINGS:
99    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
100else:
101    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
102
103HAVE_GETVALUE = not getattr(_multiprocessing,
104                            'HAVE_BROKEN_SEM_GETVALUE', False)
105
106WIN32 = (sys.platform == "win32")
107
108from multiprocessing.connection import wait
109
110def wait_for_handle(handle, timeout):
111    if timeout is not None and timeout < 0.0:
112        timeout = None
113    return wait([handle], timeout)
114
115try:
116    MAXFD = os.sysconf("SC_OPEN_MAX")
117except:
118    MAXFD = 256
119
120# To speed up tests when using the forkserver, we can preload these:
121PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
122
123#
124# Some tests require ctypes
125#
126
127try:
128    from ctypes import Structure, c_int, c_double, c_longlong
129except ImportError:
130    Structure = object
131    c_int = c_double = c_longlong = None
132
133
134def check_enough_semaphores():
135    """Check that the system supports enough semaphores to run the test."""
136    # minimum number of semaphores available according to POSIX
137    nsems_min = 256
138    try:
139        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
140    except (AttributeError, ValueError):
141        # sysconf not available or setting not available
142        return
143    if nsems == -1 or nsems >= nsems_min:
144        return
145    raise unittest.SkipTest("The OS doesn't support enough semaphores "
146                            "to run the test (required: %d)." % nsems_min)
147
148
149#
150# Creates a wrapper for a function which records the time it takes to finish
151#
152
153class TimingWrapper(object):
154
155    def __init__(self, func):
156        self.func = func
157        self.elapsed = None
158
159    def __call__(self, *args, **kwds):
160        t = time.monotonic()
161        try:
162            return self.func(*args, **kwds)
163        finally:
164            self.elapsed = time.monotonic() - t
165
166#
167# Base class for test cases
168#
169
170class BaseTestCase(object):
171
172    ALLOWED_TYPES = ('processes', 'manager', 'threads')
173
174    def assertTimingAlmostEqual(self, a, b):
175        if CHECK_TIMINGS:
176            self.assertAlmostEqual(a, b, 1)
177
178    def assertReturnsIfImplemented(self, value, func, *args):
179        try:
180            res = func(*args)
181        except NotImplementedError:
182            pass
183        else:
184            return self.assertEqual(value, res)
185
186    # For the sanity of Windows users, rather than crashing or freezing in
187    # multiple ways.
188    def __reduce__(self, *args):
189        raise NotImplementedError("shouldn't try to pickle a test case")
190
191    __reduce_ex__ = __reduce__
192
193#
194# Return the value of a semaphore
195#
196
197def get_value(self):
198    try:
199        return self.get_value()
200    except AttributeError:
201        try:
202            return self._Semaphore__value
203        except AttributeError:
204            try:
205                return self._value
206            except AttributeError:
207                raise NotImplementedError
208
209#
210# Testcases
211#
212
213class DummyCallable:
214    def __call__(self, q, c):
215        assert isinstance(c, DummyCallable)
216        q.put(5)
217
218
219class _TestProcess(BaseTestCase):
220
221    ALLOWED_TYPES = ('processes', 'threads')
222
223    def test_current(self):
224        if self.TYPE == 'threads':
225            self.skipTest('test not appropriate for {}'.format(self.TYPE))
226
227        current = self.current_process()
228        authkey = current.authkey
229
230        self.assertTrue(current.is_alive())
231        self.assertTrue(not current.daemon)
232        self.assertIsInstance(authkey, bytes)
233        self.assertTrue(len(authkey) > 0)
234        self.assertEqual(current.ident, os.getpid())
235        self.assertEqual(current.exitcode, None)
236
237    def test_daemon_argument(self):
238        if self.TYPE == "threads":
239            self.skipTest('test not appropriate for {}'.format(self.TYPE))
240
241        # By default uses the current process's daemon flag.
242        proc0 = self.Process(target=self._test)
243        self.assertEqual(proc0.daemon, self.current_process().daemon)
244        proc1 = self.Process(target=self._test, daemon=True)
245        self.assertTrue(proc1.daemon)
246        proc2 = self.Process(target=self._test, daemon=False)
247        self.assertFalse(proc2.daemon)
248
249    @classmethod
250    def _test(cls, q, *args, **kwds):
251        current = cls.current_process()
252        q.put(args)
253        q.put(kwds)
254        q.put(current.name)
255        if cls.TYPE != 'threads':
256            q.put(bytes(current.authkey))
257            q.put(current.pid)
258
259    def test_process(self):
260        q = self.Queue(1)
261        e = self.Event()
262        args = (q, 1, 2)
263        kwargs = {'hello':23, 'bye':2.54}
264        name = 'SomeProcess'
265        p = self.Process(
266            target=self._test, args=args, kwargs=kwargs, name=name
267            )
268        p.daemon = True
269        current = self.current_process()
270
271        if self.TYPE != 'threads':
272            self.assertEqual(p.authkey, current.authkey)
273        self.assertEqual(p.is_alive(), False)
274        self.assertEqual(p.daemon, True)
275        self.assertNotIn(p, self.active_children())
276        self.assertTrue(type(self.active_children()) is list)
277        self.assertEqual(p.exitcode, None)
278
279        p.start()
280
281        self.assertEqual(p.exitcode, None)
282        self.assertEqual(p.is_alive(), True)
283        self.assertIn(p, self.active_children())
284
285        self.assertEqual(q.get(), args[1:])
286        self.assertEqual(q.get(), kwargs)
287        self.assertEqual(q.get(), p.name)
288        if self.TYPE != 'threads':
289            self.assertEqual(q.get(), current.authkey)
290            self.assertEqual(q.get(), p.pid)
291
292        p.join()
293
294        self.assertEqual(p.exitcode, 0)
295        self.assertEqual(p.is_alive(), False)
296        self.assertNotIn(p, self.active_children())
297        close_queue(q)
298
299    @classmethod
300    def _sleep_some(cls):
301        time.sleep(100)
302
303    @classmethod
304    def _test_sleep(cls, delay):
305        time.sleep(delay)
306
307    def _kill_process(self, meth):
308        if self.TYPE == 'threads':
309            self.skipTest('test not appropriate for {}'.format(self.TYPE))
310
311        p = self.Process(target=self._sleep_some)
312        p.daemon = True
313        p.start()
314
315        self.assertEqual(p.is_alive(), True)
316        self.assertIn(p, self.active_children())
317        self.assertEqual(p.exitcode, None)
318
319        join = TimingWrapper(p.join)
320
321        self.assertEqual(join(0), None)
322        self.assertTimingAlmostEqual(join.elapsed, 0.0)
323        self.assertEqual(p.is_alive(), True)
324
325        self.assertEqual(join(-1), None)
326        self.assertTimingAlmostEqual(join.elapsed, 0.0)
327        self.assertEqual(p.is_alive(), True)
328
329        # XXX maybe terminating too soon causes the problems on Gentoo...
330        time.sleep(1)
331
332        meth(p)
333
334        if hasattr(signal, 'alarm'):
335            # On the Gentoo buildbot waitpid() often seems to block forever.
336            # We use alarm() to interrupt it if it blocks for too long.
337            def handler(*args):
338                raise RuntimeError('join took too long: %s' % p)
339            old_handler = signal.signal(signal.SIGALRM, handler)
340            try:
341                signal.alarm(10)
342                self.assertEqual(join(), None)
343            finally:
344                signal.alarm(0)
345                signal.signal(signal.SIGALRM, old_handler)
346        else:
347            self.assertEqual(join(), None)
348
349        self.assertTimingAlmostEqual(join.elapsed, 0.0)
350
351        self.assertEqual(p.is_alive(), False)
352        self.assertNotIn(p, self.active_children())
353
354        p.join()
355
356        return p.exitcode
357
358    def test_terminate(self):
359        exitcode = self._kill_process(multiprocessing.Process.terminate)
360        if os.name != 'nt':
361            self.assertEqual(exitcode, -signal.SIGTERM)
362
363    def test_kill(self):
364        exitcode = self._kill_process(multiprocessing.Process.kill)
365        if os.name != 'nt':
366            self.assertEqual(exitcode, -signal.SIGKILL)
367
368    def test_cpu_count(self):
369        try:
370            cpus = multiprocessing.cpu_count()
371        except NotImplementedError:
372            cpus = 1
373        self.assertTrue(type(cpus) is int)
374        self.assertTrue(cpus >= 1)
375
376    def test_active_children(self):
377        self.assertEqual(type(self.active_children()), list)
378
379        p = self.Process(target=time.sleep, args=(DELTA,))
380        self.assertNotIn(p, self.active_children())
381
382        p.daemon = True
383        p.start()
384        self.assertIn(p, self.active_children())
385
386        p.join()
387        self.assertNotIn(p, self.active_children())
388
389    @classmethod
390    def _test_recursion(cls, wconn, id):
391        wconn.send(id)
392        if len(id) < 2:
393            for i in range(2):
394                p = cls.Process(
395                    target=cls._test_recursion, args=(wconn, id+[i])
396                    )
397                p.start()
398                p.join()
399
400    def test_recursion(self):
401        rconn, wconn = self.Pipe(duplex=False)
402        self._test_recursion(wconn, [])
403
404        time.sleep(DELTA)
405        result = []
406        while rconn.poll():
407            result.append(rconn.recv())
408
409        expected = [
410            [],
411              [0],
412                [0, 0],
413                [0, 1],
414              [1],
415                [1, 0],
416                [1, 1]
417            ]
418        self.assertEqual(result, expected)
419
420    @classmethod
421    def _test_sentinel(cls, event):
422        event.wait(10.0)
423
424    def test_sentinel(self):
425        if self.TYPE == "threads":
426            self.skipTest('test not appropriate for {}'.format(self.TYPE))
427        event = self.Event()
428        p = self.Process(target=self._test_sentinel, args=(event,))
429        with self.assertRaises(ValueError):
430            p.sentinel
431        p.start()
432        self.addCleanup(p.join)
433        sentinel = p.sentinel
434        self.assertIsInstance(sentinel, int)
435        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
436        event.set()
437        p.join()
438        self.assertTrue(wait_for_handle(sentinel, timeout=1))
439
440    @classmethod
441    def _test_close(cls, rc=0, q=None):
442        if q is not None:
443            q.get()
444        sys.exit(rc)
445
446    def test_close(self):
447        if self.TYPE == "threads":
448            self.skipTest('test not appropriate for {}'.format(self.TYPE))
449        q = self.Queue()
450        p = self.Process(target=self._test_close, kwargs={'q': q})
451        p.daemon = True
452        p.start()
453        self.assertEqual(p.is_alive(), True)
454        # Child is still alive, cannot close
455        with self.assertRaises(ValueError):
456            p.close()
457
458        q.put(None)
459        p.join()
460        self.assertEqual(p.is_alive(), False)
461        self.assertEqual(p.exitcode, 0)
462        p.close()
463        with self.assertRaises(ValueError):
464            p.is_alive()
465        with self.assertRaises(ValueError):
466            p.join()
467        with self.assertRaises(ValueError):
468            p.terminate()
469        p.close()
470
471        wr = weakref.ref(p)
472        del p
473        gc.collect()
474        self.assertIs(wr(), None)
475
476        close_queue(q)
477
478    def test_many_processes(self):
479        if self.TYPE == 'threads':
480            self.skipTest('test not appropriate for {}'.format(self.TYPE))
481
482        sm = multiprocessing.get_start_method()
483        N = 5 if sm == 'spawn' else 100
484
485        # Try to overwhelm the forkserver loop with events
486        procs = [self.Process(target=self._test_sleep, args=(0.01,))
487                 for i in range(N)]
488        for p in procs:
489            p.start()
490        for p in procs:
491            join_process(p)
492        for p in procs:
493            self.assertEqual(p.exitcode, 0)
494
495        procs = [self.Process(target=self._sleep_some)
496                 for i in range(N)]
497        for p in procs:
498            p.start()
499        time.sleep(0.001)  # let the children start...
500        for p in procs:
501            p.terminate()
502        for p in procs:
503            join_process(p)
504        if os.name != 'nt':
505            exitcodes = [-signal.SIGTERM]
506            if sys.platform == 'darwin':
507                # bpo-31510: On macOS, killing a freshly started process with
508                # SIGTERM sometimes kills the process with SIGKILL.
509                exitcodes.append(-signal.SIGKILL)
510            for p in procs:
511                self.assertIn(p.exitcode, exitcodes)
512
513    def test_lose_target_ref(self):
514        c = DummyCallable()
515        wr = weakref.ref(c)
516        q = self.Queue()
517        p = self.Process(target=c, args=(q, c))
518        del c
519        p.start()
520        p.join()
521        self.assertIs(wr(), None)
522        self.assertEqual(q.get(), 5)
523        close_queue(q)
524
525    @classmethod
526    def _test_child_fd_inflation(self, evt, q):
527        q.put(test.support.fd_count())
528        evt.wait()
529
530    def test_child_fd_inflation(self):
531        # Number of fds in child processes should not grow with the
532        # number of running children.
533        if self.TYPE == 'threads':
534            self.skipTest('test not appropriate for {}'.format(self.TYPE))
535
536        sm = multiprocessing.get_start_method()
537        if sm == 'fork':
538            # The fork method by design inherits all fds from the parent,
539            # trying to go against it is a lost battle
540            self.skipTest('test not appropriate for {}'.format(sm))
541
542        N = 5
543        evt = self.Event()
544        q = self.Queue()
545
546        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
547                 for i in range(N)]
548        for p in procs:
549            p.start()
550
551        try:
552            fd_counts = [q.get() for i in range(N)]
553            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
554
555        finally:
556            evt.set()
557            for p in procs:
558                p.join()
559            close_queue(q)
560
561    @classmethod
562    def _test_wait_for_threads(self, evt):
563        def func1():
564            time.sleep(0.5)
565            evt.set()
566
567        def func2():
568            time.sleep(20)
569            evt.clear()
570
571        threading.Thread(target=func1).start()
572        threading.Thread(target=func2, daemon=True).start()
573
574    def test_wait_for_threads(self):
575        # A child process should wait for non-daemonic threads to end
576        # before exiting
577        if self.TYPE == 'threads':
578            self.skipTest('test not appropriate for {}'.format(self.TYPE))
579
580        evt = self.Event()
581        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
582        proc.start()
583        proc.join()
584        self.assertTrue(evt.is_set())
585
586    @classmethod
587    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
588        for stream_name, action in break_std_streams.items():
589            if action == 'close':
590                stream = io.StringIO()
591                stream.close()
592            else:
593                assert action == 'remove'
594                stream = None
595            setattr(sys, stream_name, None)
596        evt.set()
597
598    def test_error_on_stdio_flush_1(self):
599        # Check that Process works with broken standard streams
600        streams = [io.StringIO(), None]
601        streams[0].close()
602        for stream_name in ('stdout', 'stderr'):
603            for stream in streams:
604                old_stream = getattr(sys, stream_name)
605                setattr(sys, stream_name, stream)
606                try:
607                    evt = self.Event()
608                    proc = self.Process(target=self._test_error_on_stdio_flush,
609                                        args=(evt,))
610                    proc.start()
611                    proc.join()
612                    self.assertTrue(evt.is_set())
613                    self.assertEqual(proc.exitcode, 0)
614                finally:
615                    setattr(sys, stream_name, old_stream)
616
617    def test_error_on_stdio_flush_2(self):
618        # Same as test_error_on_stdio_flush_1(), but standard streams are
619        # broken by the child process
620        for stream_name in ('stdout', 'stderr'):
621            for action in ('close', 'remove'):
622                old_stream = getattr(sys, stream_name)
623                try:
624                    evt = self.Event()
625                    proc = self.Process(target=self._test_error_on_stdio_flush,
626                                        args=(evt, {stream_name: action}))
627                    proc.start()
628                    proc.join()
629                    self.assertTrue(evt.is_set())
630                    self.assertEqual(proc.exitcode, 0)
631                finally:
632                    setattr(sys, stream_name, old_stream)
633
634    @classmethod
635    def _sleep_and_set_event(self, evt, delay=0.0):
636        time.sleep(delay)
637        evt.set()
638
639    def check_forkserver_death(self, signum):
640        # bpo-31308: if the forkserver process has died, we should still
641        # be able to create and run new Process instances (the forkserver
642        # is implicitly restarted).
643        if self.TYPE == 'threads':
644            self.skipTest('test not appropriate for {}'.format(self.TYPE))
645        sm = multiprocessing.get_start_method()
646        if sm != 'forkserver':
647            # The fork method by design inherits all fds from the parent,
648            # trying to go against it is a lost battle
649            self.skipTest('test not appropriate for {}'.format(sm))
650
651        from multiprocessing.forkserver import _forkserver
652        _forkserver.ensure_running()
653
654        # First process sleeps 500 ms
655        delay = 0.5
656
657        evt = self.Event()
658        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
659        proc.start()
660
661        pid = _forkserver._forkserver_pid
662        os.kill(pid, signum)
663        # give time to the fork server to die and time to proc to complete
664        time.sleep(delay * 2.0)
665
666        evt2 = self.Event()
667        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
668        proc2.start()
669        proc2.join()
670        self.assertTrue(evt2.is_set())
671        self.assertEqual(proc2.exitcode, 0)
672
673        proc.join()
674        self.assertTrue(evt.is_set())
675        self.assertIn(proc.exitcode, (0, 255))
676
677    def test_forkserver_sigint(self):
678        # Catchable signal
679        self.check_forkserver_death(signal.SIGINT)
680
681    def test_forkserver_sigkill(self):
682        # Uncatchable signal
683        if os.name != 'nt':
684            self.check_forkserver_death(signal.SIGKILL)
685
686
687#
688#
689#
690
691class _UpperCaser(multiprocessing.Process):
692
693    def __init__(self):
694        multiprocessing.Process.__init__(self)
695        self.child_conn, self.parent_conn = multiprocessing.Pipe()
696
697    def run(self):
698        self.parent_conn.close()
699        for s in iter(self.child_conn.recv, None):
700            self.child_conn.send(s.upper())
701        self.child_conn.close()
702
703    def submit(self, s):
704        assert type(s) is str
705        self.parent_conn.send(s)
706        return self.parent_conn.recv()
707
708    def stop(self):
709        self.parent_conn.send(None)
710        self.parent_conn.close()
711        self.child_conn.close()
712
713class _TestSubclassingProcess(BaseTestCase):
714
715    ALLOWED_TYPES = ('processes',)
716
717    def test_subclassing(self):
718        uppercaser = _UpperCaser()
719        uppercaser.daemon = True
720        uppercaser.start()
721        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
722        self.assertEqual(uppercaser.submit('world'), 'WORLD')
723        uppercaser.stop()
724        uppercaser.join()
725
726    def test_stderr_flush(self):
727        # sys.stderr is flushed at process shutdown (issue #13812)
728        if self.TYPE == "threads":
729            self.skipTest('test not appropriate for {}'.format(self.TYPE))
730
731        testfn = test.support.TESTFN
732        self.addCleanup(test.support.unlink, testfn)
733        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
734        proc.start()
735        proc.join()
736        with open(testfn, 'r') as f:
737            err = f.read()
738            # The whole traceback was printed
739            self.assertIn("ZeroDivisionError", err)
740            self.assertIn("test_multiprocessing.py", err)
741            self.assertIn("1/0 # MARKER", err)
742
743    @classmethod
744    def _test_stderr_flush(cls, testfn):
745        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
746        sys.stderr = open(fd, 'w', closefd=False)
747        1/0 # MARKER
748
749
750    @classmethod
751    def _test_sys_exit(cls, reason, testfn):
752        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
753        sys.stderr = open(fd, 'w', closefd=False)
754        sys.exit(reason)
755
756    def test_sys_exit(self):
757        # See Issue 13854
758        if self.TYPE == 'threads':
759            self.skipTest('test not appropriate for {}'.format(self.TYPE))
760
761        testfn = test.support.TESTFN
762        self.addCleanup(test.support.unlink, testfn)
763
764        for reason in (
765            [1, 2, 3],
766            'ignore this',
767        ):
768            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
769            p.daemon = True
770            p.start()
771            join_process(p)
772            self.assertEqual(p.exitcode, 1)
773
774            with open(testfn, 'r') as f:
775                content = f.read()
776            self.assertEqual(content.rstrip(), str(reason))
777
778            os.unlink(testfn)
779
780        for reason in (True, False, 8):
781            p = self.Process(target=sys.exit, args=(reason,))
782            p.daemon = True
783            p.start()
784            join_process(p)
785            self.assertEqual(p.exitcode, reason)
786
787#
788#
789#
790
791def queue_empty(q):
792    if hasattr(q, 'empty'):
793        return q.empty()
794    else:
795        return q.qsize() == 0
796
797def queue_full(q, maxsize):
798    if hasattr(q, 'full'):
799        return q.full()
800    else:
801        return q.qsize() == maxsize
802
803
804class _TestQueue(BaseTestCase):
805
806
807    @classmethod
808    def _test_put(cls, queue, child_can_start, parent_can_continue):
809        child_can_start.wait()
810        for i in range(6):
811            queue.get()
812        parent_can_continue.set()
813
814    def test_put(self):
815        MAXSIZE = 6
816        queue = self.Queue(maxsize=MAXSIZE)
817        child_can_start = self.Event()
818        parent_can_continue = self.Event()
819
820        proc = self.Process(
821            target=self._test_put,
822            args=(queue, child_can_start, parent_can_continue)
823            )
824        proc.daemon = True
825        proc.start()
826
827        self.assertEqual(queue_empty(queue), True)
828        self.assertEqual(queue_full(queue, MAXSIZE), False)
829
830        queue.put(1)
831        queue.put(2, True)
832        queue.put(3, True, None)
833        queue.put(4, False)
834        queue.put(5, False, None)
835        queue.put_nowait(6)
836
837        # the values may be in buffer but not yet in pipe so sleep a bit
838        time.sleep(DELTA)
839
840        self.assertEqual(queue_empty(queue), False)
841        self.assertEqual(queue_full(queue, MAXSIZE), True)
842
843        put = TimingWrapper(queue.put)
844        put_nowait = TimingWrapper(queue.put_nowait)
845
846        self.assertRaises(pyqueue.Full, put, 7, False)
847        self.assertTimingAlmostEqual(put.elapsed, 0)
848
849        self.assertRaises(pyqueue.Full, put, 7, False, None)
850        self.assertTimingAlmostEqual(put.elapsed, 0)
851
852        self.assertRaises(pyqueue.Full, put_nowait, 7)
853        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
854
855        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
856        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
857
858        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
859        self.assertTimingAlmostEqual(put.elapsed, 0)
860
861        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
862        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
863
864        child_can_start.set()
865        parent_can_continue.wait()
866
867        self.assertEqual(queue_empty(queue), True)
868        self.assertEqual(queue_full(queue, MAXSIZE), False)
869
870        proc.join()
871        close_queue(queue)
872
873    @classmethod
874    def _test_get(cls, queue, child_can_start, parent_can_continue):
875        child_can_start.wait()
876        #queue.put(1)
877        queue.put(2)
878        queue.put(3)
879        queue.put(4)
880        queue.put(5)
881        parent_can_continue.set()
882
883    def test_get(self):
884        queue = self.Queue()
885        child_can_start = self.Event()
886        parent_can_continue = self.Event()
887
888        proc = self.Process(
889            target=self._test_get,
890            args=(queue, child_can_start, parent_can_continue)
891            )
892        proc.daemon = True
893        proc.start()
894
895        self.assertEqual(queue_empty(queue), True)
896
897        child_can_start.set()
898        parent_can_continue.wait()
899
900        time.sleep(DELTA)
901        self.assertEqual(queue_empty(queue), False)
902
903        # Hangs unexpectedly, remove for now
904        #self.assertEqual(queue.get(), 1)
905        self.assertEqual(queue.get(True, None), 2)
906        self.assertEqual(queue.get(True), 3)
907        self.assertEqual(queue.get(timeout=1), 4)
908        self.assertEqual(queue.get_nowait(), 5)
909
910        self.assertEqual(queue_empty(queue), True)
911
912        get = TimingWrapper(queue.get)
913        get_nowait = TimingWrapper(queue.get_nowait)
914
915        self.assertRaises(pyqueue.Empty, get, False)
916        self.assertTimingAlmostEqual(get.elapsed, 0)
917
918        self.assertRaises(pyqueue.Empty, get, False, None)
919        self.assertTimingAlmostEqual(get.elapsed, 0)
920
921        self.assertRaises(pyqueue.Empty, get_nowait)
922        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
923
924        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
925        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
926
927        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
928        self.assertTimingAlmostEqual(get.elapsed, 0)
929
930        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
931        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
932
933        proc.join()
934        close_queue(queue)
935
936    @classmethod
937    def _test_fork(cls, queue):
938        for i in range(10, 20):
939            queue.put(i)
940        # note that at this point the items may only be buffered, so the
941        # process cannot shutdown until the feeder thread has finished
942        # pushing items onto the pipe.
943
944    def test_fork(self):
945        # Old versions of Queue would fail to create a new feeder
946        # thread for a forked process if the original process had its
947        # own feeder thread.  This test checks that this no longer
948        # happens.
949
950        queue = self.Queue()
951
952        # put items on queue so that main process starts a feeder thread
953        for i in range(10):
954            queue.put(i)
955
956        # wait to make sure thread starts before we fork a new process
957        time.sleep(DELTA)
958
959        # fork process
960        p = self.Process(target=self._test_fork, args=(queue,))
961        p.daemon = True
962        p.start()
963
964        # check that all expected items are in the queue
965        for i in range(20):
966            self.assertEqual(queue.get(), i)
967        self.assertRaises(pyqueue.Empty, queue.get, False)
968
969        p.join()
970        close_queue(queue)
971
972    def test_qsize(self):
973        q = self.Queue()
974        try:
975            self.assertEqual(q.qsize(), 0)
976        except NotImplementedError:
977            self.skipTest('qsize method not implemented')
978        q.put(1)
979        self.assertEqual(q.qsize(), 1)
980        q.put(5)
981        self.assertEqual(q.qsize(), 2)
982        q.get()
983        self.assertEqual(q.qsize(), 1)
984        q.get()
985        self.assertEqual(q.qsize(), 0)
986        close_queue(q)
987
988    @classmethod
989    def _test_task_done(cls, q):
990        for obj in iter(q.get, None):
991            time.sleep(DELTA)
992            q.task_done()
993
994    def test_task_done(self):
995        queue = self.JoinableQueue()
996
997        workers = [self.Process(target=self._test_task_done, args=(queue,))
998                   for i in range(4)]
999
1000        for p in workers:
1001            p.daemon = True
1002            p.start()
1003
1004        for i in range(10):
1005            queue.put(i)
1006
1007        queue.join()
1008
1009        for p in workers:
1010            queue.put(None)
1011
1012        for p in workers:
1013            p.join()
1014        close_queue(queue)
1015
1016    def test_no_import_lock_contention(self):
1017        with test.support.temp_cwd():
1018            module_name = 'imported_by_an_imported_module'
1019            with open(module_name + '.py', 'w') as f:
1020                f.write("""if 1:
1021                    import multiprocessing
1022
1023                    q = multiprocessing.Queue()
1024                    q.put('knock knock')
1025                    q.get(timeout=3)
1026                    q.close()
1027                    del q
1028                """)
1029
1030            with test.support.DirsOnSysPath(os.getcwd()):
1031                try:
1032                    __import__(module_name)
1033                except pyqueue.Empty:
1034                    self.fail("Probable regression on import lock contention;"
1035                              " see Issue #22853")
1036
1037    def test_timeout(self):
1038        q = multiprocessing.Queue()
1039        start = time.monotonic()
1040        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1041        delta = time.monotonic() - start
1042        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1043        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1044        # failed because the delta was only 135.8 ms.
1045        self.assertGreaterEqual(delta, 0.100)
1046        close_queue(q)
1047
1048    def test_queue_feeder_donot_stop_onexc(self):
1049        # bpo-30414: verify feeder handles exceptions correctly
1050        if self.TYPE != 'processes':
1051            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1052
1053        class NotSerializable(object):
1054            def __reduce__(self):
1055                raise AttributeError
1056        with test.support.captured_stderr():
1057            q = self.Queue()
1058            q.put(NotSerializable())
1059            q.put(True)
1060            # bpo-30595: use a timeout of 1 second for slow buildbots
1061            self.assertTrue(q.get(timeout=1.0))
1062            close_queue(q)
1063
1064        with test.support.captured_stderr():
1065            # bpo-33078: verify that the queue size is correctly handled
1066            # on errors.
1067            q = self.Queue(maxsize=1)
1068            q.put(NotSerializable())
1069            q.put(True)
1070            try:
1071                self.assertEqual(q.qsize(), 1)
1072            except NotImplementedError:
1073                # qsize is not available on all platform as it
1074                # relies on sem_getvalue
1075                pass
1076            # bpo-30595: use a timeout of 1 second for slow buildbots
1077            self.assertTrue(q.get(timeout=1.0))
1078            # Check that the size of the queue is correct
1079            self.assertTrue(q.empty())
1080            close_queue(q)
1081
1082    def test_queue_feeder_on_queue_feeder_error(self):
1083        # bpo-30006: verify feeder handles exceptions using the
1084        # _on_queue_feeder_error hook.
1085        if self.TYPE != 'processes':
1086            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1087
1088        class NotSerializable(object):
1089            """Mock unserializable object"""
1090            def __init__(self):
1091                self.reduce_was_called = False
1092                self.on_queue_feeder_error_was_called = False
1093
1094            def __reduce__(self):
1095                self.reduce_was_called = True
1096                raise AttributeError
1097
1098        class SafeQueue(multiprocessing.queues.Queue):
1099            """Queue with overloaded _on_queue_feeder_error hook"""
1100            @staticmethod
1101            def _on_queue_feeder_error(e, obj):
1102                if (isinstance(e, AttributeError) and
1103                        isinstance(obj, NotSerializable)):
1104                    obj.on_queue_feeder_error_was_called = True
1105
1106        not_serializable_obj = NotSerializable()
1107        # The captured_stderr reduces the noise in the test report
1108        with test.support.captured_stderr():
1109            q = SafeQueue(ctx=multiprocessing.get_context())
1110            q.put(not_serializable_obj)
1111
1112            # Verify that q is still functioning correctly
1113            q.put(True)
1114            self.assertTrue(q.get(timeout=1.0))
1115
1116        # Assert that the serialization and the hook have been called correctly
1117        self.assertTrue(not_serializable_obj.reduce_was_called)
1118        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1119#
1120#
1121#
1122
1123class _TestLock(BaseTestCase):
1124
1125    def test_lock(self):
1126        lock = self.Lock()
1127        self.assertEqual(lock.acquire(), True)
1128        self.assertEqual(lock.acquire(False), False)
1129        self.assertEqual(lock.release(), None)
1130        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1131
1132    def test_rlock(self):
1133        lock = self.RLock()
1134        self.assertEqual(lock.acquire(), True)
1135        self.assertEqual(lock.acquire(), True)
1136        self.assertEqual(lock.acquire(), True)
1137        self.assertEqual(lock.release(), None)
1138        self.assertEqual(lock.release(), None)
1139        self.assertEqual(lock.release(), None)
1140        self.assertRaises((AssertionError, RuntimeError), lock.release)
1141
1142    def test_lock_context(self):
1143        with self.Lock():
1144            pass
1145
1146
1147class _TestSemaphore(BaseTestCase):
1148
1149    def _test_semaphore(self, sem):
1150        self.assertReturnsIfImplemented(2, get_value, sem)
1151        self.assertEqual(sem.acquire(), True)
1152        self.assertReturnsIfImplemented(1, get_value, sem)
1153        self.assertEqual(sem.acquire(), True)
1154        self.assertReturnsIfImplemented(0, get_value, sem)
1155        self.assertEqual(sem.acquire(False), False)
1156        self.assertReturnsIfImplemented(0, get_value, sem)
1157        self.assertEqual(sem.release(), None)
1158        self.assertReturnsIfImplemented(1, get_value, sem)
1159        self.assertEqual(sem.release(), None)
1160        self.assertReturnsIfImplemented(2, get_value, sem)
1161
1162    def test_semaphore(self):
1163        sem = self.Semaphore(2)
1164        self._test_semaphore(sem)
1165        self.assertEqual(sem.release(), None)
1166        self.assertReturnsIfImplemented(3, get_value, sem)
1167        self.assertEqual(sem.release(), None)
1168        self.assertReturnsIfImplemented(4, get_value, sem)
1169
1170    def test_bounded_semaphore(self):
1171        sem = self.BoundedSemaphore(2)
1172        self._test_semaphore(sem)
1173        # Currently fails on OS/X
1174        #if HAVE_GETVALUE:
1175        #    self.assertRaises(ValueError, sem.release)
1176        #    self.assertReturnsIfImplemented(2, get_value, sem)
1177
1178    def test_timeout(self):
1179        if self.TYPE != 'processes':
1180            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1181
1182        sem = self.Semaphore(0)
1183        acquire = TimingWrapper(sem.acquire)
1184
1185        self.assertEqual(acquire(False), False)
1186        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1187
1188        self.assertEqual(acquire(False, None), False)
1189        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1190
1191        self.assertEqual(acquire(False, TIMEOUT1), False)
1192        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1193
1194        self.assertEqual(acquire(True, TIMEOUT2), False)
1195        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1196
1197        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1198        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1199
1200
1201class _TestCondition(BaseTestCase):
1202
1203    @classmethod
1204    def f(cls, cond, sleeping, woken, timeout=None):
1205        cond.acquire()
1206        sleeping.release()
1207        cond.wait(timeout)
1208        woken.release()
1209        cond.release()
1210
1211    def assertReachesEventually(self, func, value):
1212        for i in range(10):
1213            try:
1214                if func() == value:
1215                    break
1216            except NotImplementedError:
1217                break
1218            time.sleep(DELTA)
1219        time.sleep(DELTA)
1220        self.assertReturnsIfImplemented(value, func)
1221
1222    def check_invariant(self, cond):
1223        # this is only supposed to succeed when there are no sleepers
1224        if self.TYPE == 'processes':
1225            try:
1226                sleepers = (cond._sleeping_count.get_value() -
1227                            cond._woken_count.get_value())
1228                self.assertEqual(sleepers, 0)
1229                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1230            except NotImplementedError:
1231                pass
1232
1233    def test_notify(self):
1234        cond = self.Condition()
1235        sleeping = self.Semaphore(0)
1236        woken = self.Semaphore(0)
1237
1238        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1239        p.daemon = True
1240        p.start()
1241        self.addCleanup(p.join)
1242
1243        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1244        p.daemon = True
1245        p.start()
1246        self.addCleanup(p.join)
1247
1248        # wait for both children to start sleeping
1249        sleeping.acquire()
1250        sleeping.acquire()
1251
1252        # check no process/thread has woken up
1253        time.sleep(DELTA)
1254        self.assertReturnsIfImplemented(0, get_value, woken)
1255
1256        # wake up one process/thread
1257        cond.acquire()
1258        cond.notify()
1259        cond.release()
1260
1261        # check one process/thread has woken up
1262        time.sleep(DELTA)
1263        self.assertReturnsIfImplemented(1, get_value, woken)
1264
1265        # wake up another
1266        cond.acquire()
1267        cond.notify()
1268        cond.release()
1269
1270        # check other has woken up
1271        time.sleep(DELTA)
1272        self.assertReturnsIfImplemented(2, get_value, woken)
1273
1274        # check state is not mucked up
1275        self.check_invariant(cond)
1276        p.join()
1277
1278    def test_notify_all(self):
1279        cond = self.Condition()
1280        sleeping = self.Semaphore(0)
1281        woken = self.Semaphore(0)
1282
1283        # start some threads/processes which will timeout
1284        for i in range(3):
1285            p = self.Process(target=self.f,
1286                             args=(cond, sleeping, woken, TIMEOUT1))
1287            p.daemon = True
1288            p.start()
1289            self.addCleanup(p.join)
1290
1291            t = threading.Thread(target=self.f,
1292                                 args=(cond, sleeping, woken, TIMEOUT1))
1293            t.daemon = True
1294            t.start()
1295            self.addCleanup(t.join)
1296
1297        # wait for them all to sleep
1298        for i in range(6):
1299            sleeping.acquire()
1300
1301        # check they have all timed out
1302        for i in range(6):
1303            woken.acquire()
1304        self.assertReturnsIfImplemented(0, get_value, woken)
1305
1306        # check state is not mucked up
1307        self.check_invariant(cond)
1308
1309        # start some more threads/processes
1310        for i in range(3):
1311            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1312            p.daemon = True
1313            p.start()
1314            self.addCleanup(p.join)
1315
1316            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1317            t.daemon = True
1318            t.start()
1319            self.addCleanup(t.join)
1320
1321        # wait for them to all sleep
1322        for i in range(6):
1323            sleeping.acquire()
1324
1325        # check no process/thread has woken up
1326        time.sleep(DELTA)
1327        self.assertReturnsIfImplemented(0, get_value, woken)
1328
1329        # wake them all up
1330        cond.acquire()
1331        cond.notify_all()
1332        cond.release()
1333
1334        # check they have all woken
1335        self.assertReachesEventually(lambda: get_value(woken), 6)
1336
1337        # check state is not mucked up
1338        self.check_invariant(cond)
1339
1340    def test_notify_n(self):
1341        cond = self.Condition()
1342        sleeping = self.Semaphore(0)
1343        woken = self.Semaphore(0)
1344
1345        # start some threads/processes
1346        for i in range(3):
1347            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1348            p.daemon = True
1349            p.start()
1350            self.addCleanup(p.join)
1351
1352            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1353            t.daemon = True
1354            t.start()
1355            self.addCleanup(t.join)
1356
1357        # wait for them to all sleep
1358        for i in range(6):
1359            sleeping.acquire()
1360
1361        # check no process/thread has woken up
1362        time.sleep(DELTA)
1363        self.assertReturnsIfImplemented(0, get_value, woken)
1364
1365        # wake some of them up
1366        cond.acquire()
1367        cond.notify(n=2)
1368        cond.release()
1369
1370        # check 2 have woken
1371        self.assertReachesEventually(lambda: get_value(woken), 2)
1372
1373        # wake the rest of them
1374        cond.acquire()
1375        cond.notify(n=4)
1376        cond.release()
1377
1378        self.assertReachesEventually(lambda: get_value(woken), 6)
1379
1380        # doesn't do anything more
1381        cond.acquire()
1382        cond.notify(n=3)
1383        cond.release()
1384
1385        self.assertReturnsIfImplemented(6, get_value, woken)
1386
1387        # check state is not mucked up
1388        self.check_invariant(cond)
1389
1390    def test_timeout(self):
1391        cond = self.Condition()
1392        wait = TimingWrapper(cond.wait)
1393        cond.acquire()
1394        res = wait(TIMEOUT1)
1395        cond.release()
1396        self.assertEqual(res, False)
1397        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1398
1399    @classmethod
1400    def _test_waitfor_f(cls, cond, state):
1401        with cond:
1402            state.value = 0
1403            cond.notify()
1404            result = cond.wait_for(lambda : state.value==4)
1405            if not result or state.value != 4:
1406                sys.exit(1)
1407
1408    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1409    def test_waitfor(self):
1410        # based on test in test/lock_tests.py
1411        cond = self.Condition()
1412        state = self.Value('i', -1)
1413
1414        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1415        p.daemon = True
1416        p.start()
1417
1418        with cond:
1419            result = cond.wait_for(lambda : state.value==0)
1420            self.assertTrue(result)
1421            self.assertEqual(state.value, 0)
1422
1423        for i in range(4):
1424            time.sleep(0.01)
1425            with cond:
1426                state.value += 1
1427                cond.notify()
1428
1429        join_process(p)
1430        self.assertEqual(p.exitcode, 0)
1431
1432    @classmethod
1433    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1434        sem.release()
1435        with cond:
1436            expected = 0.1
1437            dt = time.monotonic()
1438            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1439            dt = time.monotonic() - dt
1440            # borrow logic in assertTimeout() from test/lock_tests.py
1441            if not result and expected * 0.6 < dt < expected * 10.0:
1442                success.value = True
1443
1444    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1445    def test_waitfor_timeout(self):
1446        # based on test in test/lock_tests.py
1447        cond = self.Condition()
1448        state = self.Value('i', 0)
1449        success = self.Value('i', False)
1450        sem = self.Semaphore(0)
1451
1452        p = self.Process(target=self._test_waitfor_timeout_f,
1453                         args=(cond, state, success, sem))
1454        p.daemon = True
1455        p.start()
1456        self.assertTrue(sem.acquire(timeout=TIMEOUT))
1457
1458        # Only increment 3 times, so state == 4 is never reached.
1459        for i in range(3):
1460            time.sleep(0.01)
1461            with cond:
1462                state.value += 1
1463                cond.notify()
1464
1465        join_process(p)
1466        self.assertTrue(success.value)
1467
1468    @classmethod
1469    def _test_wait_result(cls, c, pid):
1470        with c:
1471            c.notify()
1472        time.sleep(1)
1473        if pid is not None:
1474            os.kill(pid, signal.SIGINT)
1475
1476    def test_wait_result(self):
1477        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1478            pid = os.getpid()
1479        else:
1480            pid = None
1481
1482        c = self.Condition()
1483        with c:
1484            self.assertFalse(c.wait(0))
1485            self.assertFalse(c.wait(0.1))
1486
1487            p = self.Process(target=self._test_wait_result, args=(c, pid))
1488            p.start()
1489
1490            self.assertTrue(c.wait(60))
1491            if pid is not None:
1492                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1493
1494            p.join()
1495
1496
1497class _TestEvent(BaseTestCase):
1498
1499    @classmethod
1500    def _test_event(cls, event):
1501        time.sleep(TIMEOUT2)
1502        event.set()
1503
1504    def test_event(self):
1505        event = self.Event()
1506        wait = TimingWrapper(event.wait)
1507
1508        # Removed temporarily, due to API shear, this does not
1509        # work with threading._Event objects. is_set == isSet
1510        self.assertEqual(event.is_set(), False)
1511
1512        # Removed, threading.Event.wait() will return the value of the __flag
1513        # instead of None. API Shear with the semaphore backed mp.Event
1514        self.assertEqual(wait(0.0), False)
1515        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1516        self.assertEqual(wait(TIMEOUT1), False)
1517        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1518
1519        event.set()
1520
1521        # See note above on the API differences
1522        self.assertEqual(event.is_set(), True)
1523        self.assertEqual(wait(), True)
1524        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1525        self.assertEqual(wait(TIMEOUT1), True)
1526        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1527        # self.assertEqual(event.is_set(), True)
1528
1529        event.clear()
1530
1531        #self.assertEqual(event.is_set(), False)
1532
1533        p = self.Process(target=self._test_event, args=(event,))
1534        p.daemon = True
1535        p.start()
1536        self.assertEqual(wait(), True)
1537        p.join()
1538
1539#
1540# Tests for Barrier - adapted from tests in test/lock_tests.py
1541#
1542
1543# Many of the tests for threading.Barrier use a list as an atomic
1544# counter: a value is appended to increment the counter, and the
1545# length of the list gives the value.  We use the class DummyList
1546# for the same purpose.
1547
1548class _DummyList(object):
1549
1550    def __init__(self):
1551        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1552        lock = multiprocessing.Lock()
1553        self.__setstate__((wrapper, lock))
1554        self._lengthbuf[0] = 0
1555
1556    def __setstate__(self, state):
1557        (self._wrapper, self._lock) = state
1558        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1559
1560    def __getstate__(self):
1561        return (self._wrapper, self._lock)
1562
1563    def append(self, _):
1564        with self._lock:
1565            self._lengthbuf[0] += 1
1566
1567    def __len__(self):
1568        with self._lock:
1569            return self._lengthbuf[0]
1570
1571def _wait():
1572    # A crude wait/yield function not relying on synchronization primitives.
1573    time.sleep(0.01)
1574
1575
1576class Bunch(object):
1577    """
1578    A bunch of threads.
1579    """
1580    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1581        """
1582        Construct a bunch of `n` threads running the same function `f`.
1583        If `wait_before_exit` is True, the threads won't terminate until
1584        do_finish() is called.
1585        """
1586        self.f = f
1587        self.args = args
1588        self.n = n
1589        self.started = namespace.DummyList()
1590        self.finished = namespace.DummyList()
1591        self._can_exit = namespace.Event()
1592        if not wait_before_exit:
1593            self._can_exit.set()
1594
1595        threads = []
1596        for i in range(n):
1597            p = namespace.Process(target=self.task)
1598            p.daemon = True
1599            p.start()
1600            threads.append(p)
1601
1602        def finalize(threads):
1603            for p in threads:
1604                p.join()
1605
1606        self._finalizer = weakref.finalize(self, finalize, threads)
1607
1608    def task(self):
1609        pid = os.getpid()
1610        self.started.append(pid)
1611        try:
1612            self.f(*self.args)
1613        finally:
1614            self.finished.append(pid)
1615            self._can_exit.wait(30)
1616            assert self._can_exit.is_set()
1617
1618    def wait_for_started(self):
1619        while len(self.started) < self.n:
1620            _wait()
1621
1622    def wait_for_finished(self):
1623        while len(self.finished) < self.n:
1624            _wait()
1625
1626    def do_finish(self):
1627        self._can_exit.set()
1628
1629    def close(self):
1630        self._finalizer()
1631
1632
1633class AppendTrue(object):
1634    def __init__(self, obj):
1635        self.obj = obj
1636    def __call__(self):
1637        self.obj.append(True)
1638
1639
1640class _TestBarrier(BaseTestCase):
1641    """
1642    Tests for Barrier objects.
1643    """
1644    N = 5
1645    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1646
1647    def setUp(self):
1648        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1649
1650    def tearDown(self):
1651        self.barrier.abort()
1652        self.barrier = None
1653
1654    def DummyList(self):
1655        if self.TYPE == 'threads':
1656            return []
1657        elif self.TYPE == 'manager':
1658            return self.manager.list()
1659        else:
1660            return _DummyList()
1661
1662    def run_threads(self, f, args):
1663        b = Bunch(self, f, args, self.N-1)
1664        try:
1665            f(*args)
1666            b.wait_for_finished()
1667        finally:
1668            b.close()
1669
1670    @classmethod
1671    def multipass(cls, barrier, results, n):
1672        m = barrier.parties
1673        assert m == cls.N
1674        for i in range(n):
1675            results[0].append(True)
1676            assert len(results[1]) == i * m
1677            barrier.wait()
1678            results[1].append(True)
1679            assert len(results[0]) == (i + 1) * m
1680            barrier.wait()
1681        try:
1682            assert barrier.n_waiting == 0
1683        except NotImplementedError:
1684            pass
1685        assert not barrier.broken
1686
1687    def test_barrier(self, passes=1):
1688        """
1689        Test that a barrier is passed in lockstep
1690        """
1691        results = [self.DummyList(), self.DummyList()]
1692        self.run_threads(self.multipass, (self.barrier, results, passes))
1693
1694    def test_barrier_10(self):
1695        """
1696        Test that a barrier works for 10 consecutive runs
1697        """
1698        return self.test_barrier(10)
1699
1700    @classmethod
1701    def _test_wait_return_f(cls, barrier, queue):
1702        res = barrier.wait()
1703        queue.put(res)
1704
1705    def test_wait_return(self):
1706        """
1707        test the return value from barrier.wait
1708        """
1709        queue = self.Queue()
1710        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1711        results = [queue.get() for i in range(self.N)]
1712        self.assertEqual(results.count(0), 1)
1713        close_queue(queue)
1714
1715    @classmethod
1716    def _test_action_f(cls, barrier, results):
1717        barrier.wait()
1718        if len(results) != 1:
1719            raise RuntimeError
1720
1721    def test_action(self):
1722        """
1723        Test the 'action' callback
1724        """
1725        results = self.DummyList()
1726        barrier = self.Barrier(self.N, action=AppendTrue(results))
1727        self.run_threads(self._test_action_f, (barrier, results))
1728        self.assertEqual(len(results), 1)
1729
1730    @classmethod
1731    def _test_abort_f(cls, barrier, results1, results2):
1732        try:
1733            i = barrier.wait()
1734            if i == cls.N//2:
1735                raise RuntimeError
1736            barrier.wait()
1737            results1.append(True)
1738        except threading.BrokenBarrierError:
1739            results2.append(True)
1740        except RuntimeError:
1741            barrier.abort()
1742
1743    def test_abort(self):
1744        """
1745        Test that an abort will put the barrier in a broken state
1746        """
1747        results1 = self.DummyList()
1748        results2 = self.DummyList()
1749        self.run_threads(self._test_abort_f,
1750                         (self.barrier, results1, results2))
1751        self.assertEqual(len(results1), 0)
1752        self.assertEqual(len(results2), self.N-1)
1753        self.assertTrue(self.barrier.broken)
1754
1755    @classmethod
1756    def _test_reset_f(cls, barrier, results1, results2, results3):
1757        i = barrier.wait()
1758        if i == cls.N//2:
1759            # Wait until the other threads are all in the barrier.
1760            while barrier.n_waiting < cls.N-1:
1761                time.sleep(0.001)
1762            barrier.reset()
1763        else:
1764            try:
1765                barrier.wait()
1766                results1.append(True)
1767            except threading.BrokenBarrierError:
1768                results2.append(True)
1769        # Now, pass the barrier again
1770        barrier.wait()
1771        results3.append(True)
1772
1773    def test_reset(self):
1774        """
1775        Test that a 'reset' on a barrier frees the waiting threads
1776        """
1777        results1 = self.DummyList()
1778        results2 = self.DummyList()
1779        results3 = self.DummyList()
1780        self.run_threads(self._test_reset_f,
1781                         (self.barrier, results1, results2, results3))
1782        self.assertEqual(len(results1), 0)
1783        self.assertEqual(len(results2), self.N-1)
1784        self.assertEqual(len(results3), self.N)
1785
1786    @classmethod
1787    def _test_abort_and_reset_f(cls, barrier, barrier2,
1788                                results1, results2, results3):
1789        try:
1790            i = barrier.wait()
1791            if i == cls.N//2:
1792                raise RuntimeError
1793            barrier.wait()
1794            results1.append(True)
1795        except threading.BrokenBarrierError:
1796            results2.append(True)
1797        except RuntimeError:
1798            barrier.abort()
1799        # Synchronize and reset the barrier.  Must synchronize first so
1800        # that everyone has left it when we reset, and after so that no
1801        # one enters it before the reset.
1802        if barrier2.wait() == cls.N//2:
1803            barrier.reset()
1804        barrier2.wait()
1805        barrier.wait()
1806        results3.append(True)
1807
1808    def test_abort_and_reset(self):
1809        """
1810        Test that a barrier can be reset after being broken.
1811        """
1812        results1 = self.DummyList()
1813        results2 = self.DummyList()
1814        results3 = self.DummyList()
1815        barrier2 = self.Barrier(self.N)
1816
1817        self.run_threads(self._test_abort_and_reset_f,
1818                         (self.barrier, barrier2, results1, results2, results3))
1819        self.assertEqual(len(results1), 0)
1820        self.assertEqual(len(results2), self.N-1)
1821        self.assertEqual(len(results3), self.N)
1822
1823    @classmethod
1824    def _test_timeout_f(cls, barrier, results):
1825        i = barrier.wait()
1826        if i == cls.N//2:
1827            # One thread is late!
1828            time.sleep(1.0)
1829        try:
1830            barrier.wait(0.5)
1831        except threading.BrokenBarrierError:
1832            results.append(True)
1833
1834    def test_timeout(self):
1835        """
1836        Test wait(timeout)
1837        """
1838        results = self.DummyList()
1839        self.run_threads(self._test_timeout_f, (self.barrier, results))
1840        self.assertEqual(len(results), self.barrier.parties)
1841
1842    @classmethod
1843    def _test_default_timeout_f(cls, barrier, results):
1844        i = barrier.wait(cls.defaultTimeout)
1845        if i == cls.N//2:
1846            # One thread is later than the default timeout
1847            time.sleep(1.0)
1848        try:
1849            barrier.wait()
1850        except threading.BrokenBarrierError:
1851            results.append(True)
1852
1853    def test_default_timeout(self):
1854        """
1855        Test the barrier's default timeout
1856        """
1857        barrier = self.Barrier(self.N, timeout=0.5)
1858        results = self.DummyList()
1859        self.run_threads(self._test_default_timeout_f, (barrier, results))
1860        self.assertEqual(len(results), barrier.parties)
1861
1862    def test_single_thread(self):
1863        b = self.Barrier(1)
1864        b.wait()
1865        b.wait()
1866
1867    @classmethod
1868    def _test_thousand_f(cls, barrier, passes, conn, lock):
1869        for i in range(passes):
1870            barrier.wait()
1871            with lock:
1872                conn.send(i)
1873
1874    def test_thousand(self):
1875        if self.TYPE == 'manager':
1876            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1877        passes = 1000
1878        lock = self.Lock()
1879        conn, child_conn = self.Pipe(False)
1880        for j in range(self.N):
1881            p = self.Process(target=self._test_thousand_f,
1882                           args=(self.barrier, passes, child_conn, lock))
1883            p.start()
1884            self.addCleanup(p.join)
1885
1886        for i in range(passes):
1887            for j in range(self.N):
1888                self.assertEqual(conn.recv(), i)
1889
1890#
1891#
1892#
1893
1894class _TestValue(BaseTestCase):
1895
1896    ALLOWED_TYPES = ('processes',)
1897
1898    codes_values = [
1899        ('i', 4343, 24234),
1900        ('d', 3.625, -4.25),
1901        ('h', -232, 234),
1902        ('q', 2 ** 33, 2 ** 34),
1903        ('c', latin('x'), latin('y'))
1904        ]
1905
1906    def setUp(self):
1907        if not HAS_SHAREDCTYPES:
1908            self.skipTest("requires multiprocessing.sharedctypes")
1909
1910    @classmethod
1911    def _test(cls, values):
1912        for sv, cv in zip(values, cls.codes_values):
1913            sv.value = cv[2]
1914
1915
1916    def test_value(self, raw=False):
1917        if raw:
1918            values = [self.RawValue(code, value)
1919                      for code, value, _ in self.codes_values]
1920        else:
1921            values = [self.Value(code, value)
1922                      for code, value, _ in self.codes_values]
1923
1924        for sv, cv in zip(values, self.codes_values):
1925            self.assertEqual(sv.value, cv[1])
1926
1927        proc = self.Process(target=self._test, args=(values,))
1928        proc.daemon = True
1929        proc.start()
1930        proc.join()
1931
1932        for sv, cv in zip(values, self.codes_values):
1933            self.assertEqual(sv.value, cv[2])
1934
1935    def test_rawvalue(self):
1936        self.test_value(raw=True)
1937
1938    def test_getobj_getlock(self):
1939        val1 = self.Value('i', 5)
1940        lock1 = val1.get_lock()
1941        obj1 = val1.get_obj()
1942
1943        val2 = self.Value('i', 5, lock=None)
1944        lock2 = val2.get_lock()
1945        obj2 = val2.get_obj()
1946
1947        lock = self.Lock()
1948        val3 = self.Value('i', 5, lock=lock)
1949        lock3 = val3.get_lock()
1950        obj3 = val3.get_obj()
1951        self.assertEqual(lock, lock3)
1952
1953        arr4 = self.Value('i', 5, lock=False)
1954        self.assertFalse(hasattr(arr4, 'get_lock'))
1955        self.assertFalse(hasattr(arr4, 'get_obj'))
1956
1957        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1958
1959        arr5 = self.RawValue('i', 5)
1960        self.assertFalse(hasattr(arr5, 'get_lock'))
1961        self.assertFalse(hasattr(arr5, 'get_obj'))
1962
1963
1964class _TestArray(BaseTestCase):
1965
1966    ALLOWED_TYPES = ('processes',)
1967
1968    @classmethod
1969    def f(cls, seq):
1970        for i in range(1, len(seq)):
1971            seq[i] += seq[i-1]
1972
1973    @unittest.skipIf(c_int is None, "requires _ctypes")
1974    def test_array(self, raw=False):
1975        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1976        if raw:
1977            arr = self.RawArray('i', seq)
1978        else:
1979            arr = self.Array('i', seq)
1980
1981        self.assertEqual(len(arr), len(seq))
1982        self.assertEqual(arr[3], seq[3])
1983        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1984
1985        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1986
1987        self.assertEqual(list(arr[:]), seq)
1988
1989        self.f(seq)
1990
1991        p = self.Process(target=self.f, args=(arr,))
1992        p.daemon = True
1993        p.start()
1994        p.join()
1995
1996        self.assertEqual(list(arr[:]), seq)
1997
1998    @unittest.skipIf(c_int is None, "requires _ctypes")
1999    def test_array_from_size(self):
2000        size = 10
2001        # Test for zeroing (see issue #11675).
2002        # The repetition below strengthens the test by increasing the chances
2003        # of previously allocated non-zero memory being used for the new array
2004        # on the 2nd and 3rd loops.
2005        for _ in range(3):
2006            arr = self.Array('i', size)
2007            self.assertEqual(len(arr), size)
2008            self.assertEqual(list(arr), [0] * size)
2009            arr[:] = range(10)
2010            self.assertEqual(list(arr), list(range(10)))
2011            del arr
2012
2013    @unittest.skipIf(c_int is None, "requires _ctypes")
2014    def test_rawarray(self):
2015        self.test_array(raw=True)
2016
2017    @unittest.skipIf(c_int is None, "requires _ctypes")
2018    def test_getobj_getlock_obj(self):
2019        arr1 = self.Array('i', list(range(10)))
2020        lock1 = arr1.get_lock()
2021        obj1 = arr1.get_obj()
2022
2023        arr2 = self.Array('i', list(range(10)), lock=None)
2024        lock2 = arr2.get_lock()
2025        obj2 = arr2.get_obj()
2026
2027        lock = self.Lock()
2028        arr3 = self.Array('i', list(range(10)), lock=lock)
2029        lock3 = arr3.get_lock()
2030        obj3 = arr3.get_obj()
2031        self.assertEqual(lock, lock3)
2032
2033        arr4 = self.Array('i', range(10), lock=False)
2034        self.assertFalse(hasattr(arr4, 'get_lock'))
2035        self.assertFalse(hasattr(arr4, 'get_obj'))
2036        self.assertRaises(AttributeError,
2037                          self.Array, 'i', range(10), lock='notalock')
2038
2039        arr5 = self.RawArray('i', range(10))
2040        self.assertFalse(hasattr(arr5, 'get_lock'))
2041        self.assertFalse(hasattr(arr5, 'get_obj'))
2042
2043#
2044#
2045#
2046
2047class _TestContainers(BaseTestCase):
2048
2049    ALLOWED_TYPES = ('manager',)
2050
2051    def test_list(self):
2052        a = self.list(list(range(10)))
2053        self.assertEqual(a[:], list(range(10)))
2054
2055        b = self.list()
2056        self.assertEqual(b[:], [])
2057
2058        b.extend(list(range(5)))
2059        self.assertEqual(b[:], list(range(5)))
2060
2061        self.assertEqual(b[2], 2)
2062        self.assertEqual(b[2:10], [2,3,4])
2063
2064        b *= 2
2065        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2066
2067        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2068
2069        self.assertEqual(a[:], list(range(10)))
2070
2071        d = [a, b]
2072        e = self.list(d)
2073        self.assertEqual(
2074            [element[:] for element in e],
2075            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2076            )
2077
2078        f = self.list([a])
2079        a.append('hello')
2080        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2081
2082    def test_list_iter(self):
2083        a = self.list(list(range(10)))
2084        it = iter(a)
2085        self.assertEqual(list(it), list(range(10)))
2086        self.assertEqual(list(it), [])  # exhausted
2087        # list modified during iteration
2088        it = iter(a)
2089        a[0] = 100
2090        self.assertEqual(next(it), 100)
2091
2092    def test_list_proxy_in_list(self):
2093        a = self.list([self.list(range(3)) for _i in range(3)])
2094        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2095
2096        a[0][-1] = 55
2097        self.assertEqual(a[0][:], [0, 1, 55])
2098        for i in range(1, 3):
2099            self.assertEqual(a[i][:], [0, 1, 2])
2100
2101        self.assertEqual(a[1].pop(), 2)
2102        self.assertEqual(len(a[1]), 2)
2103        for i in range(0, 3, 2):
2104            self.assertEqual(len(a[i]), 3)
2105
2106        del a
2107
2108        b = self.list()
2109        b.append(b)
2110        del b
2111
2112    def test_dict(self):
2113        d = self.dict()
2114        indices = list(range(65, 70))
2115        for i in indices:
2116            d[i] = chr(i)
2117        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2118        self.assertEqual(sorted(d.keys()), indices)
2119        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2120        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2121
2122    def test_dict_iter(self):
2123        d = self.dict()
2124        indices = list(range(65, 70))
2125        for i in indices:
2126            d[i] = chr(i)
2127        it = iter(d)
2128        self.assertEqual(list(it), indices)
2129        self.assertEqual(list(it), [])  # exhausted
2130        # dictionary changed size during iteration
2131        it = iter(d)
2132        d.clear()
2133        self.assertRaises(RuntimeError, next, it)
2134
2135    def test_dict_proxy_nested(self):
2136        pets = self.dict(ferrets=2, hamsters=4)
2137        supplies = self.dict(water=10, feed=3)
2138        d = self.dict(pets=pets, supplies=supplies)
2139
2140        self.assertEqual(supplies['water'], 10)
2141        self.assertEqual(d['supplies']['water'], 10)
2142
2143        d['supplies']['blankets'] = 5
2144        self.assertEqual(supplies['blankets'], 5)
2145        self.assertEqual(d['supplies']['blankets'], 5)
2146
2147        d['supplies']['water'] = 7
2148        self.assertEqual(supplies['water'], 7)
2149        self.assertEqual(d['supplies']['water'], 7)
2150
2151        del pets
2152        del supplies
2153        self.assertEqual(d['pets']['ferrets'], 2)
2154        d['supplies']['blankets'] = 11
2155        self.assertEqual(d['supplies']['blankets'], 11)
2156
2157        pets = d['pets']
2158        supplies = d['supplies']
2159        supplies['water'] = 7
2160        self.assertEqual(supplies['water'], 7)
2161        self.assertEqual(d['supplies']['water'], 7)
2162
2163        d.clear()
2164        self.assertEqual(len(d), 0)
2165        self.assertEqual(supplies['water'], 7)
2166        self.assertEqual(pets['hamsters'], 4)
2167
2168        l = self.list([pets, supplies])
2169        l[0]['marmots'] = 1
2170        self.assertEqual(pets['marmots'], 1)
2171        self.assertEqual(l[0]['marmots'], 1)
2172
2173        del pets
2174        del supplies
2175        self.assertEqual(l[0]['marmots'], 1)
2176
2177        outer = self.list([[88, 99], l])
2178        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2179        self.assertEqual(outer[-1][-1]['feed'], 3)
2180
2181    def test_namespace(self):
2182        n = self.Namespace()
2183        n.name = 'Bob'
2184        n.job = 'Builder'
2185        n._hidden = 'hidden'
2186        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2187        del n.job
2188        self.assertEqual(str(n), "Namespace(name='Bob')")
2189        self.assertTrue(hasattr(n, 'name'))
2190        self.assertTrue(not hasattr(n, 'job'))
2191
2192#
2193#
2194#
2195
2196def sqr(x, wait=0.0):
2197    time.sleep(wait)
2198    return x*x
2199
2200def mul(x, y):
2201    return x*y
2202
2203def raise_large_valuerror(wait):
2204    time.sleep(wait)
2205    raise ValueError("x" * 1024**2)
2206
2207def identity(x):
2208    return x
2209
2210class CountedObject(object):
2211    n_instances = 0
2212
2213    def __new__(cls):
2214        cls.n_instances += 1
2215        return object.__new__(cls)
2216
2217    def __del__(self):
2218        type(self).n_instances -= 1
2219
2220class SayWhenError(ValueError): pass
2221
2222def exception_throwing_generator(total, when):
2223    if when == -1:
2224        raise SayWhenError("Somebody said when")
2225    for i in range(total):
2226        if i == when:
2227            raise SayWhenError("Somebody said when")
2228        yield i
2229
2230
2231class _TestPool(BaseTestCase):
2232
2233    @classmethod
2234    def setUpClass(cls):
2235        super().setUpClass()
2236        cls.pool = cls.Pool(4)
2237
2238    @classmethod
2239    def tearDownClass(cls):
2240        cls.pool.terminate()
2241        cls.pool.join()
2242        cls.pool = None
2243        super().tearDownClass()
2244
2245    def test_apply(self):
2246        papply = self.pool.apply
2247        self.assertEqual(papply(sqr, (5,)), sqr(5))
2248        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2249
2250    def test_map(self):
2251        pmap = self.pool.map
2252        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2253        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2254                         list(map(sqr, list(range(100)))))
2255
2256    def test_starmap(self):
2257        psmap = self.pool.starmap
2258        tuples = list(zip(range(10), range(9,-1, -1)))
2259        self.assertEqual(psmap(mul, tuples),
2260                         list(itertools.starmap(mul, tuples)))
2261        tuples = list(zip(range(100), range(99,-1, -1)))
2262        self.assertEqual(psmap(mul, tuples, chunksize=20),
2263                         list(itertools.starmap(mul, tuples)))
2264
2265    def test_starmap_async(self):
2266        tuples = list(zip(range(100), range(99,-1, -1)))
2267        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2268                         list(itertools.starmap(mul, tuples)))
2269
2270    def test_map_async(self):
2271        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2272                         list(map(sqr, list(range(10)))))
2273
2274    def test_map_async_callbacks(self):
2275        call_args = self.manager.list() if self.TYPE == 'manager' else []
2276        self.pool.map_async(int, ['1'],
2277                            callback=call_args.append,
2278                            error_callback=call_args.append).wait()
2279        self.assertEqual(1, len(call_args))
2280        self.assertEqual([1], call_args[0])
2281        self.pool.map_async(int, ['a'],
2282                            callback=call_args.append,
2283                            error_callback=call_args.append).wait()
2284        self.assertEqual(2, len(call_args))
2285        self.assertIsInstance(call_args[1], ValueError)
2286
2287    def test_map_unplicklable(self):
2288        # Issue #19425 -- failure to pickle should not cause a hang
2289        if self.TYPE == 'threads':
2290            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2291        class A(object):
2292            def __reduce__(self):
2293                raise RuntimeError('cannot pickle')
2294        with self.assertRaises(RuntimeError):
2295            self.pool.map(sqr, [A()]*10)
2296
2297    def test_map_chunksize(self):
2298        try:
2299            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2300        except multiprocessing.TimeoutError:
2301            self.fail("pool.map_async with chunksize stalled on null list")
2302
2303    def test_map_handle_iterable_exception(self):
2304        if self.TYPE == 'manager':
2305            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2306
2307        # SayWhenError seen at the very first of the iterable
2308        with self.assertRaises(SayWhenError):
2309            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2310        # again, make sure it's reentrant
2311        with self.assertRaises(SayWhenError):
2312            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2313
2314        with self.assertRaises(SayWhenError):
2315            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2316
2317        class SpecialIterable:
2318            def __iter__(self):
2319                return self
2320            def __next__(self):
2321                raise SayWhenError
2322            def __len__(self):
2323                return 1
2324        with self.assertRaises(SayWhenError):
2325            self.pool.map(sqr, SpecialIterable(), 1)
2326        with self.assertRaises(SayWhenError):
2327            self.pool.map(sqr, SpecialIterable(), 1)
2328
2329    def test_async(self):
2330        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2331        get = TimingWrapper(res.get)
2332        self.assertEqual(get(), 49)
2333        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2334
2335    def test_async_timeout(self):
2336        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
2337        get = TimingWrapper(res.get)
2338        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2339        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2340
2341    def test_imap(self):
2342        it = self.pool.imap(sqr, list(range(10)))
2343        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2344
2345        it = self.pool.imap(sqr, list(range(10)))
2346        for i in range(10):
2347            self.assertEqual(next(it), i*i)
2348        self.assertRaises(StopIteration, it.__next__)
2349
2350        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2351        for i in range(1000):
2352            self.assertEqual(next(it), i*i)
2353        self.assertRaises(StopIteration, it.__next__)
2354
2355    def test_imap_handle_iterable_exception(self):
2356        if self.TYPE == 'manager':
2357            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2358
2359        # SayWhenError seen at the very first of the iterable
2360        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2361        self.assertRaises(SayWhenError, it.__next__)
2362        # again, make sure it's reentrant
2363        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2364        self.assertRaises(SayWhenError, it.__next__)
2365
2366        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2367        for i in range(3):
2368            self.assertEqual(next(it), i*i)
2369        self.assertRaises(SayWhenError, it.__next__)
2370
2371        # SayWhenError seen at start of problematic chunk's results
2372        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2373        for i in range(6):
2374            self.assertEqual(next(it), i*i)
2375        self.assertRaises(SayWhenError, it.__next__)
2376        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2377        for i in range(4):
2378            self.assertEqual(next(it), i*i)
2379        self.assertRaises(SayWhenError, it.__next__)
2380
2381    def test_imap_unordered(self):
2382        it = self.pool.imap_unordered(sqr, list(range(10)))
2383        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2384
2385        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2386        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2387
2388    def test_imap_unordered_handle_iterable_exception(self):
2389        if self.TYPE == 'manager':
2390            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2391
2392        # SayWhenError seen at the very first of the iterable
2393        it = self.pool.imap_unordered(sqr,
2394                                      exception_throwing_generator(1, -1),
2395                                      1)
2396        self.assertRaises(SayWhenError, it.__next__)
2397        # again, make sure it's reentrant
2398        it = self.pool.imap_unordered(sqr,
2399                                      exception_throwing_generator(1, -1),
2400                                      1)
2401        self.assertRaises(SayWhenError, it.__next__)
2402
2403        it = self.pool.imap_unordered(sqr,
2404                                      exception_throwing_generator(10, 3),
2405                                      1)
2406        expected_values = list(map(sqr, list(range(10))))
2407        with self.assertRaises(SayWhenError):
2408            # imap_unordered makes it difficult to anticipate the SayWhenError
2409            for i in range(10):
2410                value = next(it)
2411                self.assertIn(value, expected_values)
2412                expected_values.remove(value)
2413
2414        it = self.pool.imap_unordered(sqr,
2415                                      exception_throwing_generator(20, 7),
2416                                      2)
2417        expected_values = list(map(sqr, list(range(20))))
2418        with self.assertRaises(SayWhenError):
2419            for i in range(20):
2420                value = next(it)
2421                self.assertIn(value, expected_values)
2422                expected_values.remove(value)
2423
2424    def test_make_pool(self):
2425        expected_error = (RemoteError if self.TYPE == 'manager'
2426                          else ValueError)
2427
2428        self.assertRaises(expected_error, self.Pool, -1)
2429        self.assertRaises(expected_error, self.Pool, 0)
2430
2431        if self.TYPE != 'manager':
2432            p = self.Pool(3)
2433            try:
2434                self.assertEqual(3, len(p._pool))
2435            finally:
2436                p.close()
2437                p.join()
2438
2439    def test_terminate(self):
2440        result = self.pool.map_async(
2441            time.sleep, [0.1 for i in range(10000)], chunksize=1
2442            )
2443        self.pool.terminate()
2444        join = TimingWrapper(self.pool.join)
2445        join()
2446        # Sanity check the pool didn't wait for all tasks to finish
2447        self.assertLess(join.elapsed, 2.0)
2448
2449    def test_empty_iterable(self):
2450        # See Issue 12157
2451        p = self.Pool(1)
2452
2453        self.assertEqual(p.map(sqr, []), [])
2454        self.assertEqual(list(p.imap(sqr, [])), [])
2455        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2456        self.assertEqual(p.map_async(sqr, []).get(), [])
2457
2458        p.close()
2459        p.join()
2460
2461    def test_context(self):
2462        if self.TYPE == 'processes':
2463            L = list(range(10))
2464            expected = [sqr(i) for i in L]
2465            with self.Pool(2) as p:
2466                r = p.map_async(sqr, L)
2467                self.assertEqual(r.get(), expected)
2468            p.join()
2469            self.assertRaises(ValueError, p.map_async, sqr, L)
2470
2471    @classmethod
2472    def _test_traceback(cls):
2473        raise RuntimeError(123) # some comment
2474
2475    def test_traceback(self):
2476        # We want ensure that the traceback from the child process is
2477        # contained in the traceback raised in the main process.
2478        if self.TYPE == 'processes':
2479            with self.Pool(1) as p:
2480                try:
2481                    p.apply(self._test_traceback)
2482                except Exception as e:
2483                    exc = e
2484                else:
2485                    self.fail('expected RuntimeError')
2486            p.join()
2487            self.assertIs(type(exc), RuntimeError)
2488            self.assertEqual(exc.args, (123,))
2489            cause = exc.__cause__
2490            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2491            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2492
2493            with test.support.captured_stderr() as f1:
2494                try:
2495                    raise exc
2496                except RuntimeError:
2497                    sys.excepthook(*sys.exc_info())
2498            self.assertIn('raise RuntimeError(123) # some comment',
2499                          f1.getvalue())
2500            # _helper_reraises_exception should not make the error
2501            # a remote exception
2502            with self.Pool(1) as p:
2503                try:
2504                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2505                except Exception as e:
2506                    exc = e
2507                else:
2508                    self.fail('expected SayWhenError')
2509                self.assertIs(type(exc), SayWhenError)
2510                self.assertIs(exc.__cause__, None)
2511            p.join()
2512
2513    @classmethod
2514    def _test_wrapped_exception(cls):
2515        raise RuntimeError('foo')
2516
2517    def test_wrapped_exception(self):
2518        # Issue #20980: Should not wrap exception when using thread pool
2519        with self.Pool(1) as p:
2520            with self.assertRaises(RuntimeError):
2521                p.apply(self._test_wrapped_exception)
2522        p.join()
2523
2524    def test_map_no_failfast(self):
2525        # Issue #23992: the fail-fast behaviour when an exception is raised
2526        # during map() would make Pool.join() deadlock, because a worker
2527        # process would fill the result queue (after the result handler thread
2528        # terminated, hence not draining it anymore).
2529
2530        t_start = time.monotonic()
2531
2532        with self.assertRaises(ValueError):
2533            with self.Pool(2) as p:
2534                try:
2535                    p.map(raise_large_valuerror, [0, 1])
2536                finally:
2537                    time.sleep(0.5)
2538                    p.close()
2539                    p.join()
2540
2541        # check that we indeed waited for all jobs
2542        self.assertGreater(time.monotonic() - t_start, 0.9)
2543
2544    def test_release_task_refs(self):
2545        # Issue #29861: task arguments and results should not be kept
2546        # alive after we are done with them.
2547        objs = [CountedObject() for i in range(10)]
2548        refs = [weakref.ref(o) for o in objs]
2549        self.pool.map(identity, objs)
2550
2551        del objs
2552        time.sleep(DELTA)  # let threaded cleanup code run
2553        self.assertEqual(set(wr() for wr in refs), {None})
2554        # With a process pool, copies of the objects are returned, check
2555        # they were released too.
2556        self.assertEqual(CountedObject.n_instances, 0)
2557
2558
2559def raising():
2560    raise KeyError("key")
2561
2562def unpickleable_result():
2563    return lambda: 42
2564
2565class _TestPoolWorkerErrors(BaseTestCase):
2566    ALLOWED_TYPES = ('processes', )
2567
2568    def test_async_error_callback(self):
2569        p = multiprocessing.Pool(2)
2570
2571        scratchpad = [None]
2572        def errback(exc):
2573            scratchpad[0] = exc
2574
2575        res = p.apply_async(raising, error_callback=errback)
2576        self.assertRaises(KeyError, res.get)
2577        self.assertTrue(scratchpad[0])
2578        self.assertIsInstance(scratchpad[0], KeyError)
2579
2580        p.close()
2581        p.join()
2582
2583    def test_unpickleable_result(self):
2584        from multiprocessing.pool import MaybeEncodingError
2585        p = multiprocessing.Pool(2)
2586
2587        # Make sure we don't lose pool processes because of encoding errors.
2588        for iteration in range(20):
2589
2590            scratchpad = [None]
2591            def errback(exc):
2592                scratchpad[0] = exc
2593
2594            res = p.apply_async(unpickleable_result, error_callback=errback)
2595            self.assertRaises(MaybeEncodingError, res.get)
2596            wrapped = scratchpad[0]
2597            self.assertTrue(wrapped)
2598            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2599            self.assertIsNotNone(wrapped.exc)
2600            self.assertIsNotNone(wrapped.value)
2601
2602        p.close()
2603        p.join()
2604
2605class _TestPoolWorkerLifetime(BaseTestCase):
2606    ALLOWED_TYPES = ('processes', )
2607
2608    def test_pool_worker_lifetime(self):
2609        p = multiprocessing.Pool(3, maxtasksperchild=10)
2610        self.assertEqual(3, len(p._pool))
2611        origworkerpids = [w.pid for w in p._pool]
2612        # Run many tasks so each worker gets replaced (hopefully)
2613        results = []
2614        for i in range(100):
2615            results.append(p.apply_async(sqr, (i, )))
2616        # Fetch the results and verify we got the right answers,
2617        # also ensuring all the tasks have completed.
2618        for (j, res) in enumerate(results):
2619            self.assertEqual(res.get(), sqr(j))
2620        # Refill the pool
2621        p._repopulate_pool()
2622        # Wait until all workers are alive
2623        # (countdown * DELTA = 5 seconds max startup process time)
2624        countdown = 50
2625        while countdown and not all(w.is_alive() for w in p._pool):
2626            countdown -= 1
2627            time.sleep(DELTA)
2628        finalworkerpids = [w.pid for w in p._pool]
2629        # All pids should be assigned.  See issue #7805.
2630        self.assertNotIn(None, origworkerpids)
2631        self.assertNotIn(None, finalworkerpids)
2632        # Finally, check that the worker pids have changed
2633        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2634        p.close()
2635        p.join()
2636
2637    def test_pool_worker_lifetime_early_close(self):
2638        # Issue #10332: closing a pool whose workers have limited lifetimes
2639        # before all the tasks completed would make join() hang.
2640        p = multiprocessing.Pool(3, maxtasksperchild=1)
2641        results = []
2642        for i in range(6):
2643            results.append(p.apply_async(sqr, (i, 0.3)))
2644        p.close()
2645        p.join()
2646        # check the results
2647        for (j, res) in enumerate(results):
2648            self.assertEqual(res.get(), sqr(j))
2649
2650#
2651# Test of creating a customized manager class
2652#
2653
2654from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2655
2656class FooBar(object):
2657    def f(self):
2658        return 'f()'
2659    def g(self):
2660        raise ValueError
2661    def _h(self):
2662        return '_h()'
2663
2664def baz():
2665    for i in range(10):
2666        yield i*i
2667
2668class IteratorProxy(BaseProxy):
2669    _exposed_ = ('__next__',)
2670    def __iter__(self):
2671        return self
2672    def __next__(self):
2673        return self._callmethod('__next__')
2674
2675class MyManager(BaseManager):
2676    pass
2677
2678MyManager.register('Foo', callable=FooBar)
2679MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2680MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2681
2682
2683class _TestMyManager(BaseTestCase):
2684
2685    ALLOWED_TYPES = ('manager',)
2686
2687    def test_mymanager(self):
2688        manager = MyManager()
2689        manager.start()
2690        self.common(manager)
2691        manager.shutdown()
2692
2693        # If the manager process exited cleanly then the exitcode
2694        # will be zero.  Otherwise (after a short timeout)
2695        # terminate() is used, resulting in an exitcode of -SIGTERM.
2696        self.assertEqual(manager._process.exitcode, 0)
2697
2698    def test_mymanager_context(self):
2699        with MyManager() as manager:
2700            self.common(manager)
2701        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2702        # to the manager process if it takes longer than 1 second to stop.
2703        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2704
2705    def test_mymanager_context_prestarted(self):
2706        manager = MyManager()
2707        manager.start()
2708        with manager:
2709            self.common(manager)
2710        self.assertEqual(manager._process.exitcode, 0)
2711
2712    def common(self, manager):
2713        foo = manager.Foo()
2714        bar = manager.Bar()
2715        baz = manager.baz()
2716
2717        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2718        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2719
2720        self.assertEqual(foo_methods, ['f', 'g'])
2721        self.assertEqual(bar_methods, ['f', '_h'])
2722
2723        self.assertEqual(foo.f(), 'f()')
2724        self.assertRaises(ValueError, foo.g)
2725        self.assertEqual(foo._callmethod('f'), 'f()')
2726        self.assertRaises(RemoteError, foo._callmethod, '_h')
2727
2728        self.assertEqual(bar.f(), 'f()')
2729        self.assertEqual(bar._h(), '_h()')
2730        self.assertEqual(bar._callmethod('f'), 'f()')
2731        self.assertEqual(bar._callmethod('_h'), '_h()')
2732
2733        self.assertEqual(list(baz), [i*i for i in range(10)])
2734
2735
2736#
2737# Test of connecting to a remote server and using xmlrpclib for serialization
2738#
2739
2740_queue = pyqueue.Queue()
2741def get_queue():
2742    return _queue
2743
2744class QueueManager(BaseManager):
2745    '''manager class used by server process'''
2746QueueManager.register('get_queue', callable=get_queue)
2747
2748class QueueManager2(BaseManager):
2749    '''manager class which specifies the same interface as QueueManager'''
2750QueueManager2.register('get_queue')
2751
2752
2753SERIALIZER = 'xmlrpclib'
2754
2755class _TestRemoteManager(BaseTestCase):
2756
2757    ALLOWED_TYPES = ('manager',)
2758    values = ['hello world', None, True, 2.25,
2759              'hall\xe5 v\xe4rlden',
2760              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2761              b'hall\xe5 v\xe4rlden',
2762             ]
2763    result = values[:]
2764
2765    @classmethod
2766    def _putter(cls, address, authkey):
2767        manager = QueueManager2(
2768            address=address, authkey=authkey, serializer=SERIALIZER
2769            )
2770        manager.connect()
2771        queue = manager.get_queue()
2772        # Note that xmlrpclib will deserialize object as a list not a tuple
2773        queue.put(tuple(cls.values))
2774
2775    def test_remote(self):
2776        authkey = os.urandom(32)
2777
2778        manager = QueueManager(
2779            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
2780            )
2781        manager.start()
2782        self.addCleanup(manager.shutdown)
2783
2784        p = self.Process(target=self._putter, args=(manager.address, authkey))
2785        p.daemon = True
2786        p.start()
2787
2788        manager2 = QueueManager2(
2789            address=manager.address, authkey=authkey, serializer=SERIALIZER
2790            )
2791        manager2.connect()
2792        queue = manager2.get_queue()
2793
2794        self.assertEqual(queue.get(), self.result)
2795
2796        # Because we are using xmlrpclib for serialization instead of
2797        # pickle this will cause a serialization error.
2798        self.assertRaises(Exception, queue.put, time.sleep)
2799
2800        # Make queue finalizer run before the server is stopped
2801        del queue
2802
2803class _TestManagerRestart(BaseTestCase):
2804
2805    @classmethod
2806    def _putter(cls, address, authkey):
2807        manager = QueueManager(
2808            address=address, authkey=authkey, serializer=SERIALIZER)
2809        manager.connect()
2810        queue = manager.get_queue()
2811        queue.put('hello world')
2812
2813    def test_rapid_restart(self):
2814        authkey = os.urandom(32)
2815        manager = QueueManager(
2816            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2817        try:
2818            srvr = manager.get_server()
2819            addr = srvr.address
2820            # Close the connection.Listener socket which gets opened as a part
2821            # of manager.get_server(). It's not needed for the test.
2822            srvr.listener.close()
2823            manager.start()
2824
2825            p = self.Process(target=self._putter, args=(manager.address, authkey))
2826            p.start()
2827            p.join()
2828            queue = manager.get_queue()
2829            self.assertEqual(queue.get(), 'hello world')
2830            del queue
2831        finally:
2832            if hasattr(manager, "shutdown"):
2833                manager.shutdown()
2834
2835        manager = QueueManager(
2836            address=addr, authkey=authkey, serializer=SERIALIZER)
2837        try:
2838            manager.start()
2839            self.addCleanup(manager.shutdown)
2840        except OSError as e:
2841            if e.errno != errno.EADDRINUSE:
2842                raise
2843            # Retry after some time, in case the old socket was lingering
2844            # (sporadic failure on buildbots)
2845            time.sleep(1.0)
2846            manager = QueueManager(
2847                address=addr, authkey=authkey, serializer=SERIALIZER)
2848            if hasattr(manager, "shutdown"):
2849                self.addCleanup(manager.shutdown)
2850
2851#
2852#
2853#
2854
2855SENTINEL = latin('')
2856
2857class _TestConnection(BaseTestCase):
2858
2859    ALLOWED_TYPES = ('processes', 'threads')
2860
2861    @classmethod
2862    def _echo(cls, conn):
2863        for msg in iter(conn.recv_bytes, SENTINEL):
2864            conn.send_bytes(msg)
2865        conn.close()
2866
2867    def test_connection(self):
2868        conn, child_conn = self.Pipe()
2869
2870        p = self.Process(target=self._echo, args=(child_conn,))
2871        p.daemon = True
2872        p.start()
2873
2874        seq = [1, 2.25, None]
2875        msg = latin('hello world')
2876        longmsg = msg * 10
2877        arr = array.array('i', list(range(4)))
2878
2879        if self.TYPE == 'processes':
2880            self.assertEqual(type(conn.fileno()), int)
2881
2882        self.assertEqual(conn.send(seq), None)
2883        self.assertEqual(conn.recv(), seq)
2884
2885        self.assertEqual(conn.send_bytes(msg), None)
2886        self.assertEqual(conn.recv_bytes(), msg)
2887
2888        if self.TYPE == 'processes':
2889            buffer = array.array('i', [0]*10)
2890            expected = list(arr) + [0] * (10 - len(arr))
2891            self.assertEqual(conn.send_bytes(arr), None)
2892            self.assertEqual(conn.recv_bytes_into(buffer),
2893                             len(arr) * buffer.itemsize)
2894            self.assertEqual(list(buffer), expected)
2895
2896            buffer = array.array('i', [0]*10)
2897            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2898            self.assertEqual(conn.send_bytes(arr), None)
2899            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2900                             len(arr) * buffer.itemsize)
2901            self.assertEqual(list(buffer), expected)
2902
2903            buffer = bytearray(latin(' ' * 40))
2904            self.assertEqual(conn.send_bytes(longmsg), None)
2905            try:
2906                res = conn.recv_bytes_into(buffer)
2907            except multiprocessing.BufferTooShort as e:
2908                self.assertEqual(e.args, (longmsg,))
2909            else:
2910                self.fail('expected BufferTooShort, got %s' % res)
2911
2912        poll = TimingWrapper(conn.poll)
2913
2914        self.assertEqual(poll(), False)
2915        self.assertTimingAlmostEqual(poll.elapsed, 0)
2916
2917        self.assertEqual(poll(-1), False)
2918        self.assertTimingAlmostEqual(poll.elapsed, 0)
2919
2920        self.assertEqual(poll(TIMEOUT1), False)
2921        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2922
2923        conn.send(None)
2924        time.sleep(.1)
2925
2926        self.assertEqual(poll(TIMEOUT1), True)
2927        self.assertTimingAlmostEqual(poll.elapsed, 0)
2928
2929        self.assertEqual(conn.recv(), None)
2930
2931        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
2932        conn.send_bytes(really_big_msg)
2933        self.assertEqual(conn.recv_bytes(), really_big_msg)
2934
2935        conn.send_bytes(SENTINEL)                          # tell child to quit
2936        child_conn.close()
2937
2938        if self.TYPE == 'processes':
2939            self.assertEqual(conn.readable, True)
2940            self.assertEqual(conn.writable, True)
2941            self.assertRaises(EOFError, conn.recv)
2942            self.assertRaises(EOFError, conn.recv_bytes)
2943
2944        p.join()
2945
2946    def test_duplex_false(self):
2947        reader, writer = self.Pipe(duplex=False)
2948        self.assertEqual(writer.send(1), None)
2949        self.assertEqual(reader.recv(), 1)
2950        if self.TYPE == 'processes':
2951            self.assertEqual(reader.readable, True)
2952            self.assertEqual(reader.writable, False)
2953            self.assertEqual(writer.readable, False)
2954            self.assertEqual(writer.writable, True)
2955            self.assertRaises(OSError, reader.send, 2)
2956            self.assertRaises(OSError, writer.recv)
2957            self.assertRaises(OSError, writer.poll)
2958
2959    def test_spawn_close(self):
2960        # We test that a pipe connection can be closed by parent
2961        # process immediately after child is spawned.  On Windows this
2962        # would have sometimes failed on old versions because
2963        # child_conn would be closed before the child got a chance to
2964        # duplicate it.
2965        conn, child_conn = self.Pipe()
2966
2967        p = self.Process(target=self._echo, args=(child_conn,))
2968        p.daemon = True
2969        p.start()
2970        child_conn.close()    # this might complete before child initializes
2971
2972        msg = latin('hello')
2973        conn.send_bytes(msg)
2974        self.assertEqual(conn.recv_bytes(), msg)
2975
2976        conn.send_bytes(SENTINEL)
2977        conn.close()
2978        p.join()
2979
2980    def test_sendbytes(self):
2981        if self.TYPE != 'processes':
2982            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2983
2984        msg = latin('abcdefghijklmnopqrstuvwxyz')
2985        a, b = self.Pipe()
2986
2987        a.send_bytes(msg)
2988        self.assertEqual(b.recv_bytes(), msg)
2989
2990        a.send_bytes(msg, 5)
2991        self.assertEqual(b.recv_bytes(), msg[5:])
2992
2993        a.send_bytes(msg, 7, 8)
2994        self.assertEqual(b.recv_bytes(), msg[7:7+8])
2995
2996        a.send_bytes(msg, 26)
2997        self.assertEqual(b.recv_bytes(), latin(''))
2998
2999        a.send_bytes(msg, 26, 0)
3000        self.assertEqual(b.recv_bytes(), latin(''))
3001
3002        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3003
3004        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3005
3006        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3007
3008        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3009
3010        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3011
3012    @classmethod
3013    def _is_fd_assigned(cls, fd):
3014        try:
3015            os.fstat(fd)
3016        except OSError as e:
3017            if e.errno == errno.EBADF:
3018                return False
3019            raise
3020        else:
3021            return True
3022
3023    @classmethod
3024    def _writefd(cls, conn, data, create_dummy_fds=False):
3025        if create_dummy_fds:
3026            for i in range(0, 256):
3027                if not cls._is_fd_assigned(i):
3028                    os.dup2(conn.fileno(), i)
3029        fd = reduction.recv_handle(conn)
3030        if msvcrt:
3031            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3032        os.write(fd, data)
3033        os.close(fd)
3034
3035    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3036    def test_fd_transfer(self):
3037        if self.TYPE != 'processes':
3038            self.skipTest("only makes sense with processes")
3039        conn, child_conn = self.Pipe(duplex=True)
3040
3041        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3042        p.daemon = True
3043        p.start()
3044        self.addCleanup(test.support.unlink, test.support.TESTFN)
3045        with open(test.support.TESTFN, "wb") as f:
3046            fd = f.fileno()
3047            if msvcrt:
3048                fd = msvcrt.get_osfhandle(fd)
3049            reduction.send_handle(conn, fd, p.pid)
3050        p.join()
3051        with open(test.support.TESTFN, "rb") as f:
3052            self.assertEqual(f.read(), b"foo")
3053
3054    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3055    @unittest.skipIf(sys.platform == "win32",
3056                     "test semantics don't make sense on Windows")
3057    @unittest.skipIf(MAXFD <= 256,
3058                     "largest assignable fd number is too small")
3059    @unittest.skipUnless(hasattr(os, "dup2"),
3060                         "test needs os.dup2()")
3061    def test_large_fd_transfer(self):
3062        # With fd > 256 (issue #11657)
3063        if self.TYPE != 'processes':
3064            self.skipTest("only makes sense with processes")
3065        conn, child_conn = self.Pipe(duplex=True)
3066
3067        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3068        p.daemon = True
3069        p.start()
3070        self.addCleanup(test.support.unlink, test.support.TESTFN)
3071        with open(test.support.TESTFN, "wb") as f:
3072            fd = f.fileno()
3073            for newfd in range(256, MAXFD):
3074                if not self._is_fd_assigned(newfd):
3075                    break
3076            else:
3077                self.fail("could not find an unassigned large file descriptor")
3078            os.dup2(fd, newfd)
3079            try:
3080                reduction.send_handle(conn, newfd, p.pid)
3081            finally:
3082                os.close(newfd)
3083        p.join()
3084        with open(test.support.TESTFN, "rb") as f:
3085            self.assertEqual(f.read(), b"bar")
3086
3087    @classmethod
3088    def _send_data_without_fd(self, conn):
3089        os.write(conn.fileno(), b"\0")
3090
3091    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3092    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3093    def test_missing_fd_transfer(self):
3094        # Check that exception is raised when received data is not
3095        # accompanied by a file descriptor in ancillary data.
3096        if self.TYPE != 'processes':
3097            self.skipTest("only makes sense with processes")
3098        conn, child_conn = self.Pipe(duplex=True)
3099
3100        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3101        p.daemon = True
3102        p.start()
3103        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3104        p.join()
3105
3106    def test_context(self):
3107        a, b = self.Pipe()
3108
3109        with a, b:
3110            a.send(1729)
3111            self.assertEqual(b.recv(), 1729)
3112            if self.TYPE == 'processes':
3113                self.assertFalse(a.closed)
3114                self.assertFalse(b.closed)
3115
3116        if self.TYPE == 'processes':
3117            self.assertTrue(a.closed)
3118            self.assertTrue(b.closed)
3119            self.assertRaises(OSError, a.recv)
3120            self.assertRaises(OSError, b.recv)
3121
3122class _TestListener(BaseTestCase):
3123
3124    ALLOWED_TYPES = ('processes',)
3125
3126    def test_multiple_bind(self):
3127        for family in self.connection.families:
3128            l = self.connection.Listener(family=family)
3129            self.addCleanup(l.close)
3130            self.assertRaises(OSError, self.connection.Listener,
3131                              l.address, family)
3132
3133    def test_context(self):
3134        with self.connection.Listener() as l:
3135            with self.connection.Client(l.address) as c:
3136                with l.accept() as d:
3137                    c.send(1729)
3138                    self.assertEqual(d.recv(), 1729)
3139
3140        if self.TYPE == 'processes':
3141            self.assertRaises(OSError, l.accept)
3142
3143class _TestListenerClient(BaseTestCase):
3144
3145    ALLOWED_TYPES = ('processes', 'threads')
3146
3147    @classmethod
3148    def _test(cls, address):
3149        conn = cls.connection.Client(address)
3150        conn.send('hello')
3151        conn.close()
3152
3153    def test_listener_client(self):
3154        for family in self.connection.families:
3155            l = self.connection.Listener(family=family)
3156            p = self.Process(target=self._test, args=(l.address,))
3157            p.daemon = True
3158            p.start()
3159            conn = l.accept()
3160            self.assertEqual(conn.recv(), 'hello')
3161            p.join()
3162            l.close()
3163
3164    def test_issue14725(self):
3165        l = self.connection.Listener()
3166        p = self.Process(target=self._test, args=(l.address,))
3167        p.daemon = True
3168        p.start()
3169        time.sleep(1)
3170        # On Windows the client process should by now have connected,
3171        # written data and closed the pipe handle by now.  This causes
3172        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3173        # 14725.
3174        conn = l.accept()
3175        self.assertEqual(conn.recv(), 'hello')
3176        conn.close()
3177        p.join()
3178        l.close()
3179
3180    def test_issue16955(self):
3181        for fam in self.connection.families:
3182            l = self.connection.Listener(family=fam)
3183            c = self.connection.Client(l.address)
3184            a = l.accept()
3185            a.send_bytes(b"hello")
3186            self.assertTrue(c.poll(1))
3187            a.close()
3188            c.close()
3189            l.close()
3190
3191class _TestPoll(BaseTestCase):
3192
3193    ALLOWED_TYPES = ('processes', 'threads')
3194
3195    def test_empty_string(self):
3196        a, b = self.Pipe()
3197        self.assertEqual(a.poll(), False)
3198        b.send_bytes(b'')
3199        self.assertEqual(a.poll(), True)
3200        self.assertEqual(a.poll(), True)
3201
3202    @classmethod
3203    def _child_strings(cls, conn, strings):
3204        for s in strings:
3205            time.sleep(0.1)
3206            conn.send_bytes(s)
3207        conn.close()
3208
3209    def test_strings(self):
3210        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3211        a, b = self.Pipe()
3212        p = self.Process(target=self._child_strings, args=(b, strings))
3213        p.start()
3214
3215        for s in strings:
3216            for i in range(200):
3217                if a.poll(0.01):
3218                    break
3219            x = a.recv_bytes()
3220            self.assertEqual(s, x)
3221
3222        p.join()
3223
3224    @classmethod
3225    def _child_boundaries(cls, r):
3226        # Polling may "pull" a message in to the child process, but we
3227        # don't want it to pull only part of a message, as that would
3228        # corrupt the pipe for any other processes which might later
3229        # read from it.
3230        r.poll(5)
3231
3232    def test_boundaries(self):
3233        r, w = self.Pipe(False)
3234        p = self.Process(target=self._child_boundaries, args=(r,))
3235        p.start()
3236        time.sleep(2)
3237        L = [b"first", b"second"]
3238        for obj in L:
3239            w.send_bytes(obj)
3240        w.close()
3241        p.join()
3242        self.assertIn(r.recv_bytes(), L)
3243
3244    @classmethod
3245    def _child_dont_merge(cls, b):
3246        b.send_bytes(b'a')
3247        b.send_bytes(b'b')
3248        b.send_bytes(b'cd')
3249
3250    def test_dont_merge(self):
3251        a, b = self.Pipe()
3252        self.assertEqual(a.poll(0.0), False)
3253        self.assertEqual(a.poll(0.1), False)
3254
3255        p = self.Process(target=self._child_dont_merge, args=(b,))
3256        p.start()
3257
3258        self.assertEqual(a.recv_bytes(), b'a')
3259        self.assertEqual(a.poll(1.0), True)
3260        self.assertEqual(a.poll(1.0), True)
3261        self.assertEqual(a.recv_bytes(), b'b')
3262        self.assertEqual(a.poll(1.0), True)
3263        self.assertEqual(a.poll(1.0), True)
3264        self.assertEqual(a.poll(0.0), True)
3265        self.assertEqual(a.recv_bytes(), b'cd')
3266
3267        p.join()
3268
3269#
3270# Test of sending connection and socket objects between processes
3271#
3272
3273@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3274class _TestPicklingConnections(BaseTestCase):
3275
3276    ALLOWED_TYPES = ('processes',)
3277
3278    @classmethod
3279    def tearDownClass(cls):
3280        from multiprocessing import resource_sharer
3281        resource_sharer.stop(timeout=TIMEOUT)
3282
3283    @classmethod
3284    def _listener(cls, conn, families):
3285        for fam in families:
3286            l = cls.connection.Listener(family=fam)
3287            conn.send(l.address)
3288            new_conn = l.accept()
3289            conn.send(new_conn)
3290            new_conn.close()
3291            l.close()
3292
3293        l = socket.socket()
3294        l.bind((test.support.HOST, 0))
3295        l.listen()
3296        conn.send(l.getsockname())
3297        new_conn, addr = l.accept()
3298        conn.send(new_conn)
3299        new_conn.close()
3300        l.close()
3301
3302        conn.recv()
3303
3304    @classmethod
3305    def _remote(cls, conn):
3306        for (address, msg) in iter(conn.recv, None):
3307            client = cls.connection.Client(address)
3308            client.send(msg.upper())
3309            client.close()
3310
3311        address, msg = conn.recv()
3312        client = socket.socket()
3313        client.connect(address)
3314        client.sendall(msg.upper())
3315        client.close()
3316
3317        conn.close()
3318
3319    def test_pickling(self):
3320        families = self.connection.families
3321
3322        lconn, lconn0 = self.Pipe()
3323        lp = self.Process(target=self._listener, args=(lconn0, families))
3324        lp.daemon = True
3325        lp.start()
3326        lconn0.close()
3327
3328        rconn, rconn0 = self.Pipe()
3329        rp = self.Process(target=self._remote, args=(rconn0,))
3330        rp.daemon = True
3331        rp.start()
3332        rconn0.close()
3333
3334        for fam in families:
3335            msg = ('This connection uses family %s' % fam).encode('ascii')
3336            address = lconn.recv()
3337            rconn.send((address, msg))
3338            new_conn = lconn.recv()
3339            self.assertEqual(new_conn.recv(), msg.upper())
3340
3341        rconn.send(None)
3342
3343        msg = latin('This connection uses a normal socket')
3344        address = lconn.recv()
3345        rconn.send((address, msg))
3346        new_conn = lconn.recv()
3347        buf = []
3348        while True:
3349            s = new_conn.recv(100)
3350            if not s:
3351                break
3352            buf.append(s)
3353        buf = b''.join(buf)
3354        self.assertEqual(buf, msg.upper())
3355        new_conn.close()
3356
3357        lconn.send(None)
3358
3359        rconn.close()
3360        lconn.close()
3361
3362        lp.join()
3363        rp.join()
3364
3365    @classmethod
3366    def child_access(cls, conn):
3367        w = conn.recv()
3368        w.send('all is well')
3369        w.close()
3370
3371        r = conn.recv()
3372        msg = r.recv()
3373        conn.send(msg*2)
3374
3375        conn.close()
3376
3377    def test_access(self):
3378        # On Windows, if we do not specify a destination pid when
3379        # using DupHandle then we need to be careful to use the
3380        # correct access flags for DuplicateHandle(), or else
3381        # DupHandle.detach() will raise PermissionError.  For example,
3382        # for a read only pipe handle we should use
3383        # access=FILE_GENERIC_READ.  (Unfortunately
3384        # DUPLICATE_SAME_ACCESS does not work.)
3385        conn, child_conn = self.Pipe()
3386        p = self.Process(target=self.child_access, args=(child_conn,))
3387        p.daemon = True
3388        p.start()
3389        child_conn.close()
3390
3391        r, w = self.Pipe(duplex=False)
3392        conn.send(w)
3393        w.close()
3394        self.assertEqual(r.recv(), 'all is well')
3395        r.close()
3396
3397        r, w = self.Pipe(duplex=False)
3398        conn.send(r)
3399        r.close()
3400        w.send('foobar')
3401        w.close()
3402        self.assertEqual(conn.recv(), 'foobar'*2)
3403
3404        p.join()
3405
3406#
3407#
3408#
3409
3410class _TestHeap(BaseTestCase):
3411
3412    ALLOWED_TYPES = ('processes',)
3413
3414    def test_heap(self):
3415        iterations = 5000
3416        maxblocks = 50
3417        blocks = []
3418
3419        # create and destroy lots of blocks of different sizes
3420        for i in range(iterations):
3421            size = int(random.lognormvariate(0, 1) * 1000)
3422            b = multiprocessing.heap.BufferWrapper(size)
3423            blocks.append(b)
3424            if len(blocks) > maxblocks:
3425                i = random.randrange(maxblocks)
3426                del blocks[i]
3427
3428        # get the heap object
3429        heap = multiprocessing.heap.BufferWrapper._heap
3430
3431        # verify the state of the heap
3432        all = []
3433        occupied = 0
3434        heap._lock.acquire()
3435        self.addCleanup(heap._lock.release)
3436        for L in list(heap._len_to_seq.values()):
3437            for arena, start, stop in L:
3438                all.append((heap._arenas.index(arena), start, stop,
3439                            stop-start, 'free'))
3440        for arena, start, stop in heap._allocated_blocks:
3441            all.append((heap._arenas.index(arena), start, stop,
3442                        stop-start, 'occupied'))
3443            occupied += (stop-start)
3444
3445        all.sort()
3446
3447        for i in range(len(all)-1):
3448            (arena, start, stop) = all[i][:3]
3449            (narena, nstart, nstop) = all[i+1][:3]
3450            self.assertTrue((arena != narena and nstart == 0) or
3451                            (stop == nstart))
3452
3453    def test_free_from_gc(self):
3454        # Check that freeing of blocks by the garbage collector doesn't deadlock
3455        # (issue #12352).
3456        # Make sure the GC is enabled, and set lower collection thresholds to
3457        # make collections more frequent (and increase the probability of
3458        # deadlock).
3459        if not gc.isenabled():
3460            gc.enable()
3461            self.addCleanup(gc.disable)
3462        thresholds = gc.get_threshold()
3463        self.addCleanup(gc.set_threshold, *thresholds)
3464        gc.set_threshold(10)
3465
3466        # perform numerous block allocations, with cyclic references to make
3467        # sure objects are collected asynchronously by the gc
3468        for i in range(5000):
3469            a = multiprocessing.heap.BufferWrapper(1)
3470            b = multiprocessing.heap.BufferWrapper(1)
3471            # circular references
3472            a.buddy = b
3473            b.buddy = a
3474
3475#
3476#
3477#
3478
3479class _Foo(Structure):
3480    _fields_ = [
3481        ('x', c_int),
3482        ('y', c_double),
3483        ('z', c_longlong,)
3484        ]
3485
3486class _TestSharedCTypes(BaseTestCase):
3487
3488    ALLOWED_TYPES = ('processes',)
3489
3490    def setUp(self):
3491        if not HAS_SHAREDCTYPES:
3492            self.skipTest("requires multiprocessing.sharedctypes")
3493
3494    @classmethod
3495    def _double(cls, x, y, z, foo, arr, string):
3496        x.value *= 2
3497        y.value *= 2
3498        z.value *= 2
3499        foo.x *= 2
3500        foo.y *= 2
3501        string.value *= 2
3502        for i in range(len(arr)):
3503            arr[i] *= 2
3504
3505    def test_sharedctypes(self, lock=False):
3506        x = Value('i', 7, lock=lock)
3507        y = Value(c_double, 1.0/3.0, lock=lock)
3508        z = Value(c_longlong, 2 ** 33, lock=lock)
3509        foo = Value(_Foo, 3, 2, lock=lock)
3510        arr = self.Array('d', list(range(10)), lock=lock)
3511        string = self.Array('c', 20, lock=lock)
3512        string.value = latin('hello')
3513
3514        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
3515        p.daemon = True
3516        p.start()
3517        p.join()
3518
3519        self.assertEqual(x.value, 14)
3520        self.assertAlmostEqual(y.value, 2.0/3.0)
3521        self.assertEqual(z.value, 2 ** 34)
3522        self.assertEqual(foo.x, 6)
3523        self.assertAlmostEqual(foo.y, 4.0)
3524        for i in range(10):
3525            self.assertAlmostEqual(arr[i], i*2)
3526        self.assertEqual(string.value, latin('hellohello'))
3527
3528    def test_synchronize(self):
3529        self.test_sharedctypes(lock=True)
3530
3531    def test_copy(self):
3532        foo = _Foo(2, 5.0, 2 ** 33)
3533        bar = copy(foo)
3534        foo.x = 0
3535        foo.y = 0
3536        foo.z = 0
3537        self.assertEqual(bar.x, 2)
3538        self.assertAlmostEqual(bar.y, 5.0)
3539        self.assertEqual(bar.z, 2 ** 33)
3540
3541#
3542#
3543#
3544
3545class _TestFinalize(BaseTestCase):
3546
3547    ALLOWED_TYPES = ('processes',)
3548
3549    def setUp(self):
3550        self.registry_backup = util._finalizer_registry.copy()
3551        util._finalizer_registry.clear()
3552
3553    def tearDown(self):
3554        self.assertFalse(util._finalizer_registry)
3555        util._finalizer_registry.update(self.registry_backup)
3556
3557    @classmethod
3558    def _test_finalize(cls, conn):
3559        class Foo(object):
3560            pass
3561
3562        a = Foo()
3563        util.Finalize(a, conn.send, args=('a',))
3564        del a           # triggers callback for a
3565
3566        b = Foo()
3567        close_b = util.Finalize(b, conn.send, args=('b',))
3568        close_b()       # triggers callback for b
3569        close_b()       # does nothing because callback has already been called
3570        del b           # does nothing because callback has already been called
3571
3572        c = Foo()
3573        util.Finalize(c, conn.send, args=('c',))
3574
3575        d10 = Foo()
3576        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
3577
3578        d01 = Foo()
3579        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
3580        d02 = Foo()
3581        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
3582        d03 = Foo()
3583        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
3584
3585        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
3586
3587        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
3588
3589        # call multiprocessing's cleanup function then exit process without
3590        # garbage collecting locals
3591        util._exit_function()
3592        conn.close()
3593        os._exit(0)
3594
3595    def test_finalize(self):
3596        conn, child_conn = self.Pipe()
3597
3598        p = self.Process(target=self._test_finalize, args=(child_conn,))
3599        p.daemon = True
3600        p.start()
3601        p.join()
3602
3603        result = [obj for obj in iter(conn.recv, 'STOP')]
3604        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
3605
3606    def test_thread_safety(self):
3607        # bpo-24484: _run_finalizers() should be thread-safe
3608        def cb():
3609            pass
3610
3611        class Foo(object):
3612            def __init__(self):
3613                self.ref = self  # create reference cycle
3614                # insert finalizer at random key
3615                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
3616
3617        finish = False
3618        exc = None
3619
3620        def run_finalizers():
3621            nonlocal exc
3622            while not finish:
3623                time.sleep(random.random() * 1e-1)
3624                try:
3625                    # A GC run will eventually happen during this,
3626                    # collecting stale Foo's and mutating the registry
3627                    util._run_finalizers()
3628                except Exception as e:
3629                    exc = e
3630
3631        def make_finalizers():
3632            nonlocal exc
3633            d = {}
3634            while not finish:
3635                try:
3636                    # Old Foo's get gradually replaced and later
3637                    # collected by the GC (because of the cyclic ref)
3638                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
3639                except Exception as e:
3640                    exc = e
3641                    d.clear()
3642
3643        old_interval = sys.getswitchinterval()
3644        old_threshold = gc.get_threshold()
3645        try:
3646            sys.setswitchinterval(1e-6)
3647            gc.set_threshold(5, 5, 5)
3648            threads = [threading.Thread(target=run_finalizers),
3649                       threading.Thread(target=make_finalizers)]
3650            with test.support.start_threads(threads):
3651                time.sleep(4.0)  # Wait a bit to trigger race condition
3652                finish = True
3653            if exc is not None:
3654                raise exc
3655        finally:
3656            sys.setswitchinterval(old_interval)
3657            gc.set_threshold(*old_threshold)
3658            gc.collect()  # Collect remaining Foo's
3659
3660
3661#
3662# Test that from ... import * works for each module
3663#
3664
3665class _TestImportStar(unittest.TestCase):
3666
3667    def get_module_names(self):
3668        import glob
3669        folder = os.path.dirname(multiprocessing.__file__)
3670        pattern = os.path.join(folder, '*.py')
3671        files = glob.glob(pattern)
3672        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
3673        modules = ['multiprocessing.' + m for m in modules]
3674        modules.remove('multiprocessing.__init__')
3675        modules.append('multiprocessing')
3676        return modules
3677
3678    def test_import(self):
3679        modules = self.get_module_names()
3680        if sys.platform == 'win32':
3681            modules.remove('multiprocessing.popen_fork')
3682            modules.remove('multiprocessing.popen_forkserver')
3683            modules.remove('multiprocessing.popen_spawn_posix')
3684        else:
3685            modules.remove('multiprocessing.popen_spawn_win32')
3686            if not HAS_REDUCTION:
3687                modules.remove('multiprocessing.popen_forkserver')
3688
3689        if c_int is None:
3690            # This module requires _ctypes
3691            modules.remove('multiprocessing.sharedctypes')
3692
3693        for name in modules:
3694            __import__(name)
3695            mod = sys.modules[name]
3696            self.assertTrue(hasattr(mod, '__all__'), name)
3697
3698            for attr in mod.__all__:
3699                self.assertTrue(
3700                    hasattr(mod, attr),
3701                    '%r does not have attribute %r' % (mod, attr)
3702                    )
3703
3704#
3705# Quick test that logging works -- does not test logging output
3706#
3707
3708class _TestLogging(BaseTestCase):
3709
3710    ALLOWED_TYPES = ('processes',)
3711
3712    def test_enable_logging(self):
3713        logger = multiprocessing.get_logger()
3714        logger.setLevel(util.SUBWARNING)
3715        self.assertTrue(logger is not None)
3716        logger.debug('this will not be printed')
3717        logger.info('nor will this')
3718        logger.setLevel(LOG_LEVEL)
3719
3720    @classmethod
3721    def _test_level(cls, conn):
3722        logger = multiprocessing.get_logger()
3723        conn.send(logger.getEffectiveLevel())
3724
3725    def test_level(self):
3726        LEVEL1 = 32
3727        LEVEL2 = 37
3728
3729        logger = multiprocessing.get_logger()
3730        root_logger = logging.getLogger()
3731        root_level = root_logger.level
3732
3733        reader, writer = multiprocessing.Pipe(duplex=False)
3734
3735        logger.setLevel(LEVEL1)
3736        p = self.Process(target=self._test_level, args=(writer,))
3737        p.start()
3738        self.assertEqual(LEVEL1, reader.recv())
3739        p.join()
3740        p.close()
3741
3742        logger.setLevel(logging.NOTSET)
3743        root_logger.setLevel(LEVEL2)
3744        p = self.Process(target=self._test_level, args=(writer,))
3745        p.start()
3746        self.assertEqual(LEVEL2, reader.recv())
3747        p.join()
3748        p.close()
3749
3750        root_logger.setLevel(root_level)
3751        logger.setLevel(level=LOG_LEVEL)
3752
3753
3754# class _TestLoggingProcessName(BaseTestCase):
3755#
3756#     def handle(self, record):
3757#         assert record.processName == multiprocessing.current_process().name
3758#         self.__handled = True
3759#
3760#     def test_logging(self):
3761#         handler = logging.Handler()
3762#         handler.handle = self.handle
3763#         self.__handled = False
3764#         # Bypass getLogger() and side-effects
3765#         logger = logging.getLoggerClass()(
3766#                 'multiprocessing.test.TestLoggingProcessName')
3767#         logger.addHandler(handler)
3768#         logger.propagate = False
3769#
3770#         logger.warn('foo')
3771#         assert self.__handled
3772
3773#
3774# Check that Process.join() retries if os.waitpid() fails with EINTR
3775#
3776
3777class _TestPollEintr(BaseTestCase):
3778
3779    ALLOWED_TYPES = ('processes',)
3780
3781    @classmethod
3782    def _killer(cls, pid):
3783        time.sleep(0.1)
3784        os.kill(pid, signal.SIGUSR1)
3785
3786    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3787    def test_poll_eintr(self):
3788        got_signal = [False]
3789        def record(*args):
3790            got_signal[0] = True
3791        pid = os.getpid()
3792        oldhandler = signal.signal(signal.SIGUSR1, record)
3793        try:
3794            killer = self.Process(target=self._killer, args=(pid,))
3795            killer.start()
3796            try:
3797                p = self.Process(target=time.sleep, args=(2,))
3798                p.start()
3799                p.join()
3800            finally:
3801                killer.join()
3802            self.assertTrue(got_signal[0])
3803            self.assertEqual(p.exitcode, 0)
3804        finally:
3805            signal.signal(signal.SIGUSR1, oldhandler)
3806
3807#
3808# Test to verify handle verification, see issue 3321
3809#
3810
3811class TestInvalidHandle(unittest.TestCase):
3812
3813    @unittest.skipIf(WIN32, "skipped on Windows")
3814    def test_invalid_handles(self):
3815        conn = multiprocessing.connection.Connection(44977608)
3816        # check that poll() doesn't crash
3817        try:
3818            conn.poll()
3819        except (ValueError, OSError):
3820            pass
3821        finally:
3822            # Hack private attribute _handle to avoid printing an error
3823            # in conn.__del__
3824            conn._handle = None
3825        self.assertRaises((ValueError, OSError),
3826                          multiprocessing.connection.Connection, -1)
3827
3828
3829
3830class OtherTest(unittest.TestCase):
3831    # TODO: add more tests for deliver/answer challenge.
3832    def test_deliver_challenge_auth_failure(self):
3833        class _FakeConnection(object):
3834            def recv_bytes(self, size):
3835                return b'something bogus'
3836            def send_bytes(self, data):
3837                pass
3838        self.assertRaises(multiprocessing.AuthenticationError,
3839                          multiprocessing.connection.deliver_challenge,
3840                          _FakeConnection(), b'abc')
3841
3842    def test_answer_challenge_auth_failure(self):
3843        class _FakeConnection(object):
3844            def __init__(self):
3845                self.count = 0
3846            def recv_bytes(self, size):
3847                self.count += 1
3848                if self.count == 1:
3849                    return multiprocessing.connection.CHALLENGE
3850                elif self.count == 2:
3851                    return b'something bogus'
3852                return b''
3853            def send_bytes(self, data):
3854                pass
3855        self.assertRaises(multiprocessing.AuthenticationError,
3856                          multiprocessing.connection.answer_challenge,
3857                          _FakeConnection(), b'abc')
3858
3859#
3860# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3861#
3862
3863def initializer(ns):
3864    ns.test += 1
3865
3866class TestInitializers(unittest.TestCase):
3867    def setUp(self):
3868        self.mgr = multiprocessing.Manager()
3869        self.ns = self.mgr.Namespace()
3870        self.ns.test = 0
3871
3872    def tearDown(self):
3873        self.mgr.shutdown()
3874        self.mgr.join()
3875
3876    def test_manager_initializer(self):
3877        m = multiprocessing.managers.SyncManager()
3878        self.assertRaises(TypeError, m.start, 1)
3879        m.start(initializer, (self.ns,))
3880        self.assertEqual(self.ns.test, 1)
3881        m.shutdown()
3882        m.join()
3883
3884    def test_pool_initializer(self):
3885        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3886        p = multiprocessing.Pool(1, initializer, (self.ns,))
3887        p.close()
3888        p.join()
3889        self.assertEqual(self.ns.test, 1)
3890
3891#
3892# Issue 5155, 5313, 5331: Test process in processes
3893# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3894#
3895
3896def _this_sub_process(q):
3897    try:
3898        item = q.get(block=False)
3899    except pyqueue.Empty:
3900        pass
3901
3902def _test_process():
3903    queue = multiprocessing.Queue()
3904    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3905    subProc.daemon = True
3906    subProc.start()
3907    subProc.join()
3908
3909def _afunc(x):
3910    return x*x
3911
3912def pool_in_process():
3913    pool = multiprocessing.Pool(processes=4)
3914    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
3915    pool.close()
3916    pool.join()
3917
3918class _file_like(object):
3919    def __init__(self, delegate):
3920        self._delegate = delegate
3921        self._pid = None
3922
3923    @property
3924    def cache(self):
3925        pid = os.getpid()
3926        # There are no race conditions since fork keeps only the running thread
3927        if pid != self._pid:
3928            self._pid = pid
3929            self._cache = []
3930        return self._cache
3931
3932    def write(self, data):
3933        self.cache.append(data)
3934
3935    def flush(self):
3936        self._delegate.write(''.join(self.cache))
3937        self._cache = []
3938
3939class TestStdinBadfiledescriptor(unittest.TestCase):
3940
3941    def test_queue_in_process(self):
3942        proc = multiprocessing.Process(target=_test_process)
3943        proc.start()
3944        proc.join()
3945
3946    def test_pool_in_process(self):
3947        p = multiprocessing.Process(target=pool_in_process)
3948        p.start()
3949        p.join()
3950
3951    def test_flushing(self):
3952        sio = io.StringIO()
3953        flike = _file_like(sio)
3954        flike.write('foo')
3955        proc = multiprocessing.Process(target=lambda: flike.flush())
3956        flike.flush()
3957        assert sio.getvalue() == 'foo'
3958
3959
3960class TestWait(unittest.TestCase):
3961
3962    @classmethod
3963    def _child_test_wait(cls, w, slow):
3964        for i in range(10):
3965            if slow:
3966                time.sleep(random.random()*0.1)
3967            w.send((i, os.getpid()))
3968        w.close()
3969
3970    def test_wait(self, slow=False):
3971        from multiprocessing.connection import wait
3972        readers = []
3973        procs = []
3974        messages = []
3975
3976        for i in range(4):
3977            r, w = multiprocessing.Pipe(duplex=False)
3978            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
3979            p.daemon = True
3980            p.start()
3981            w.close()
3982            readers.append(r)
3983            procs.append(p)
3984            self.addCleanup(p.join)
3985
3986        while readers:
3987            for r in wait(readers):
3988                try:
3989                    msg = r.recv()
3990                except EOFError:
3991                    readers.remove(r)
3992                    r.close()
3993                else:
3994                    messages.append(msg)
3995
3996        messages.sort()
3997        expected = sorted((i, p.pid) for i in range(10) for p in procs)
3998        self.assertEqual(messages, expected)
3999
4000    @classmethod
4001    def _child_test_wait_socket(cls, address, slow):
4002        s = socket.socket()
4003        s.connect(address)
4004        for i in range(10):
4005            if slow:
4006                time.sleep(random.random()*0.1)
4007            s.sendall(('%s\n' % i).encode('ascii'))
4008        s.close()
4009
4010    def test_wait_socket(self, slow=False):
4011        from multiprocessing.connection import wait
4012        l = socket.socket()
4013        l.bind((test.support.HOST, 0))
4014        l.listen()
4015        addr = l.getsockname()
4016        readers = []
4017        procs = []
4018        dic = {}
4019
4020        for i in range(4):
4021            p = multiprocessing.Process(target=self._child_test_wait_socket,
4022                                        args=(addr, slow))
4023            p.daemon = True
4024            p.start()
4025            procs.append(p)
4026            self.addCleanup(p.join)
4027
4028        for i in range(4):
4029            r, _ = l.accept()
4030            readers.append(r)
4031            dic[r] = []
4032        l.close()
4033
4034        while readers:
4035            for r in wait(readers):
4036                msg = r.recv(32)
4037                if not msg:
4038                    readers.remove(r)
4039                    r.close()
4040                else:
4041                    dic[r].append(msg)
4042
4043        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4044        for v in dic.values():
4045            self.assertEqual(b''.join(v), expected)
4046
4047    def test_wait_slow(self):
4048        self.test_wait(True)
4049
4050    def test_wait_socket_slow(self):
4051        self.test_wait_socket(True)
4052
4053    def test_wait_timeout(self):
4054        from multiprocessing.connection import wait
4055
4056        expected = 5
4057        a, b = multiprocessing.Pipe()
4058
4059        start = time.monotonic()
4060        res = wait([a, b], expected)
4061        delta = time.monotonic() - start
4062
4063        self.assertEqual(res, [])
4064        self.assertLess(delta, expected * 2)
4065        self.assertGreater(delta, expected * 0.5)
4066
4067        b.send(None)
4068
4069        start = time.monotonic()
4070        res = wait([a, b], 20)
4071        delta = time.monotonic() - start
4072
4073        self.assertEqual(res, [a])
4074        self.assertLess(delta, 0.4)
4075
4076    @classmethod
4077    def signal_and_sleep(cls, sem, period):
4078        sem.release()
4079        time.sleep(period)
4080
4081    def test_wait_integer(self):
4082        from multiprocessing.connection import wait
4083
4084        expected = 3
4085        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
4086        sem = multiprocessing.Semaphore(0)
4087        a, b = multiprocessing.Pipe()
4088        p = multiprocessing.Process(target=self.signal_and_sleep,
4089                                    args=(sem, expected))
4090
4091        p.start()
4092        self.assertIsInstance(p.sentinel, int)
4093        self.assertTrue(sem.acquire(timeout=20))
4094
4095        start = time.monotonic()
4096        res = wait([a, p.sentinel, b], expected + 20)
4097        delta = time.monotonic() - start
4098
4099        self.assertEqual(res, [p.sentinel])
4100        self.assertLess(delta, expected + 2)
4101        self.assertGreater(delta, expected - 2)
4102
4103        a.send(None)
4104
4105        start = time.monotonic()
4106        res = wait([a, p.sentinel, b], 20)
4107        delta = time.monotonic() - start
4108
4109        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
4110        self.assertLess(delta, 0.4)
4111
4112        b.send(None)
4113
4114        start = time.monotonic()
4115        res = wait([a, p.sentinel, b], 20)
4116        delta = time.monotonic() - start
4117
4118        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
4119        self.assertLess(delta, 0.4)
4120
4121        p.terminate()
4122        p.join()
4123
4124    def test_neg_timeout(self):
4125        from multiprocessing.connection import wait
4126        a, b = multiprocessing.Pipe()
4127        t = time.monotonic()
4128        res = wait([a], timeout=-1)
4129        t = time.monotonic() - t
4130        self.assertEqual(res, [])
4131        self.assertLess(t, 1)
4132        a.close()
4133        b.close()
4134
4135#
4136# Issue 14151: Test invalid family on invalid environment
4137#
4138
4139class TestInvalidFamily(unittest.TestCase):
4140
4141    @unittest.skipIf(WIN32, "skipped on Windows")
4142    def test_invalid_family(self):
4143        with self.assertRaises(ValueError):
4144            multiprocessing.connection.Listener(r'\\.\test')
4145
4146    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
4147    def test_invalid_family_win32(self):
4148        with self.assertRaises(ValueError):
4149            multiprocessing.connection.Listener('/var/test.pipe')
4150
4151#
4152# Issue 12098: check sys.flags of child matches that for parent
4153#
4154
4155class TestFlags(unittest.TestCase):
4156    @classmethod
4157    def run_in_grandchild(cls, conn):
4158        conn.send(tuple(sys.flags))
4159
4160    @classmethod
4161    def run_in_child(cls):
4162        import json
4163        r, w = multiprocessing.Pipe(duplex=False)
4164        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4165        p.start()
4166        grandchild_flags = r.recv()
4167        p.join()
4168        r.close()
4169        w.close()
4170        flags = (tuple(sys.flags), grandchild_flags)
4171        print(json.dumps(flags))
4172
4173    def test_flags(self):
4174        import json, subprocess
4175        # start child process using unusual flags
4176        prog = ('from test._test_multiprocessing import TestFlags; ' +
4177                'TestFlags.run_in_child()')
4178        data = subprocess.check_output(
4179            [sys.executable, '-E', '-S', '-O', '-c', prog])
4180        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4181        self.assertEqual(child_flags, grandchild_flags)
4182
4183#
4184# Test interaction with socket timeouts - see Issue #6056
4185#
4186
4187class TestTimeouts(unittest.TestCase):
4188    @classmethod
4189    def _test_timeout(cls, child, address):
4190        time.sleep(1)
4191        child.send(123)
4192        child.close()
4193        conn = multiprocessing.connection.Client(address)
4194        conn.send(456)
4195        conn.close()
4196
4197    def test_timeout(self):
4198        old_timeout = socket.getdefaulttimeout()
4199        try:
4200            socket.setdefaulttimeout(0.1)
4201            parent, child = multiprocessing.Pipe(duplex=True)
4202            l = multiprocessing.connection.Listener(family='AF_INET')
4203            p = multiprocessing.Process(target=self._test_timeout,
4204                                        args=(child, l.address))
4205            p.start()
4206            child.close()
4207            self.assertEqual(parent.recv(), 123)
4208            parent.close()
4209            conn = l.accept()
4210            self.assertEqual(conn.recv(), 456)
4211            conn.close()
4212            l.close()
4213            join_process(p)
4214        finally:
4215            socket.setdefaulttimeout(old_timeout)
4216
4217#
4218# Test what happens with no "if __name__ == '__main__'"
4219#
4220
4221class TestNoForkBomb(unittest.TestCase):
4222    def test_noforkbomb(self):
4223        sm = multiprocessing.get_start_method()
4224        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
4225        if sm != 'fork':
4226            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
4227            self.assertEqual(out, b'')
4228            self.assertIn(b'RuntimeError', err)
4229        else:
4230            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
4231            self.assertEqual(out.rstrip(), b'123')
4232            self.assertEqual(err, b'')
4233
4234#
4235# Issue #17555: ForkAwareThreadLock
4236#
4237
4238class TestForkAwareThreadLock(unittest.TestCase):
4239    # We recursively start processes.  Issue #17555 meant that the
4240    # after fork registry would get duplicate entries for the same
4241    # lock.  The size of the registry at generation n was ~2**n.
4242
4243    @classmethod
4244    def child(cls, n, conn):
4245        if n > 1:
4246            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4247            p.start()
4248            conn.close()
4249            join_process(p)
4250        else:
4251            conn.send(len(util._afterfork_registry))
4252        conn.close()
4253
4254    def test_lock(self):
4255        r, w = multiprocessing.Pipe(False)
4256        l = util.ForkAwareThreadLock()
4257        old_size = len(util._afterfork_registry)
4258        p = multiprocessing.Process(target=self.child, args=(5, w))
4259        p.start()
4260        w.close()
4261        new_size = r.recv()
4262        join_process(p)
4263        self.assertLessEqual(new_size, old_size)
4264
4265#
4266# Check that non-forked child processes do not inherit unneeded fds/handles
4267#
4268
4269class TestCloseFds(unittest.TestCase):
4270
4271    def get_high_socket_fd(self):
4272        if WIN32:
4273            # The child process will not have any socket handles, so
4274            # calling socket.fromfd() should produce WSAENOTSOCK even
4275            # if there is a handle of the same number.
4276            return socket.socket().detach()
4277        else:
4278            # We want to produce a socket with an fd high enough that a
4279            # freshly created child process will not have any fds as high.
4280            fd = socket.socket().detach()
4281            to_close = []
4282            while fd < 50:
4283                to_close.append(fd)
4284                fd = os.dup(fd)
4285            for x in to_close:
4286                os.close(x)
4287            return fd
4288
4289    def close(self, fd):
4290        if WIN32:
4291            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
4292        else:
4293            os.close(fd)
4294
4295    @classmethod
4296    def _test_closefds(cls, conn, fd):
4297        try:
4298            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4299        except Exception as e:
4300            conn.send(e)
4301        else:
4302            s.close()
4303            conn.send(None)
4304
4305    def test_closefd(self):
4306        if not HAS_REDUCTION:
4307            raise unittest.SkipTest('requires fd pickling')
4308
4309        reader, writer = multiprocessing.Pipe()
4310        fd = self.get_high_socket_fd()
4311        try:
4312            p = multiprocessing.Process(target=self._test_closefds,
4313                                        args=(writer, fd))
4314            p.start()
4315            writer.close()
4316            e = reader.recv()
4317            join_process(p)
4318        finally:
4319            self.close(fd)
4320            writer.close()
4321            reader.close()
4322
4323        if multiprocessing.get_start_method() == 'fork':
4324            self.assertIs(e, None)
4325        else:
4326            WSAENOTSOCK = 10038
4327            self.assertIsInstance(e, OSError)
4328            self.assertTrue(e.errno == errno.EBADF or
4329                            e.winerror == WSAENOTSOCK, e)
4330
4331#
4332# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4333#
4334
4335class TestIgnoreEINTR(unittest.TestCase):
4336
4337    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4338    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4339
4340    @classmethod
4341    def _test_ignore(cls, conn):
4342        def handler(signum, frame):
4343            pass
4344        signal.signal(signal.SIGUSR1, handler)
4345        conn.send('ready')
4346        x = conn.recv()
4347        conn.send(x)
4348        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
4349
4350    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4351    def test_ignore(self):
4352        conn, child_conn = multiprocessing.Pipe()
4353        try:
4354            p = multiprocessing.Process(target=self._test_ignore,
4355                                        args=(child_conn,))
4356            p.daemon = True
4357            p.start()
4358            child_conn.close()
4359            self.assertEqual(conn.recv(), 'ready')
4360            time.sleep(0.1)
4361            os.kill(p.pid, signal.SIGUSR1)
4362            time.sleep(0.1)
4363            conn.send(1234)
4364            self.assertEqual(conn.recv(), 1234)
4365            time.sleep(0.1)
4366            os.kill(p.pid, signal.SIGUSR1)
4367            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
4368            time.sleep(0.1)
4369            p.join()
4370        finally:
4371            conn.close()
4372
4373    @classmethod
4374    def _test_ignore_listener(cls, conn):
4375        def handler(signum, frame):
4376            pass
4377        signal.signal(signal.SIGUSR1, handler)
4378        with multiprocessing.connection.Listener() as l:
4379            conn.send(l.address)
4380            a = l.accept()
4381            a.send('welcome')
4382
4383    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4384    def test_ignore_listener(self):
4385        conn, child_conn = multiprocessing.Pipe()
4386        try:
4387            p = multiprocessing.Process(target=self._test_ignore_listener,
4388                                        args=(child_conn,))
4389            p.daemon = True
4390            p.start()
4391            child_conn.close()
4392            address = conn.recv()
4393            time.sleep(0.1)
4394            os.kill(p.pid, signal.SIGUSR1)
4395            time.sleep(0.1)
4396            client = multiprocessing.connection.Client(address)
4397            self.assertEqual(client.recv(), 'welcome')
4398            p.join()
4399        finally:
4400            conn.close()
4401
4402class TestStartMethod(unittest.TestCase):
4403    @classmethod
4404    def _check_context(cls, conn):
4405        conn.send(multiprocessing.get_start_method())
4406
4407    def check_context(self, ctx):
4408        r, w = ctx.Pipe(duplex=False)
4409        p = ctx.Process(target=self._check_context, args=(w,))
4410        p.start()
4411        w.close()
4412        child_method = r.recv()
4413        r.close()
4414        p.join()
4415        self.assertEqual(child_method, ctx.get_start_method())
4416
4417    def test_context(self):
4418        for method in ('fork', 'spawn', 'forkserver'):
4419            try:
4420                ctx = multiprocessing.get_context(method)
4421            except ValueError:
4422                continue
4423            self.assertEqual(ctx.get_start_method(), method)
4424            self.assertIs(ctx.get_context(), ctx)
4425            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4426            self.assertRaises(ValueError, ctx.set_start_method, None)
4427            self.check_context(ctx)
4428
4429    def test_set_get(self):
4430        multiprocessing.set_forkserver_preload(PRELOAD)
4431        count = 0
4432        old_method = multiprocessing.get_start_method()
4433        try:
4434            for method in ('fork', 'spawn', 'forkserver'):
4435                try:
4436                    multiprocessing.set_start_method(method, force=True)
4437                except ValueError:
4438                    continue
4439                self.assertEqual(multiprocessing.get_start_method(), method)
4440                ctx = multiprocessing.get_context()
4441                self.assertEqual(ctx.get_start_method(), method)
4442                self.assertTrue(type(ctx).__name__.lower().startswith(method))
4443                self.assertTrue(
4444                    ctx.Process.__name__.lower().startswith(method))
4445                self.check_context(multiprocessing)
4446                count += 1
4447        finally:
4448            multiprocessing.set_start_method(old_method, force=True)
4449        self.assertGreaterEqual(count, 1)
4450
4451    def test_get_all(self):
4452        methods = multiprocessing.get_all_start_methods()
4453        if sys.platform == 'win32':
4454            self.assertEqual(methods, ['spawn'])
4455        else:
4456            self.assertTrue(methods == ['fork', 'spawn'] or
4457                            methods == ['fork', 'spawn', 'forkserver'])
4458
4459    def test_preload_resources(self):
4460        if multiprocessing.get_start_method() != 'forkserver':
4461            self.skipTest("test only relevant for 'forkserver' method")
4462        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
4463        rc, out, err = test.support.script_helper.assert_python_ok(name)
4464        out = out.decode()
4465        err = err.decode()
4466        if out.rstrip() != 'ok' or err != '':
4467            print(out)
4468            print(err)
4469            self.fail("failed spawning forkserver or grandchild")
4470
4471
4472@unittest.skipIf(sys.platform == "win32",
4473                 "test semantics don't make sense on Windows")
4474class TestSemaphoreTracker(unittest.TestCase):
4475
4476    def test_semaphore_tracker(self):
4477        #
4478        # Check that killing process does not leak named semaphores
4479        #
4480        import subprocess
4481        cmd = '''if 1:
4482            import multiprocessing as mp, time, os
4483            mp.set_start_method("spawn")
4484            lock1 = mp.Lock()
4485            lock2 = mp.Lock()
4486            os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
4487            os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
4488            time.sleep(10)
4489        '''
4490        r, w = os.pipe()
4491        p = subprocess.Popen([sys.executable,
4492                             '-E', '-c', cmd % (w, w)],
4493                             pass_fds=[w],
4494                             stderr=subprocess.PIPE)
4495        os.close(w)
4496        with open(r, 'rb', closefd=True) as f:
4497            name1 = f.readline().rstrip().decode('ascii')
4498            name2 = f.readline().rstrip().decode('ascii')
4499        _multiprocessing.sem_unlink(name1)
4500        p.terminate()
4501        p.wait()
4502        time.sleep(2.0)
4503        with self.assertRaises(OSError) as ctx:
4504            _multiprocessing.sem_unlink(name2)
4505        # docs say it should be ENOENT, but OSX seems to give EINVAL
4506        self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
4507        err = p.stderr.read().decode('utf-8')
4508        p.stderr.close()
4509        expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
4510        self.assertRegex(err, expected)
4511        self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
4512
4513    def check_semaphore_tracker_death(self, signum, should_die):
4514        # bpo-31310: if the semaphore tracker process has died, it should
4515        # be restarted implicitly.
4516        from multiprocessing.semaphore_tracker import _semaphore_tracker
4517        _semaphore_tracker.ensure_running()
4518        pid = _semaphore_tracker._pid
4519        os.kill(pid, signum)
4520        time.sleep(1.0)  # give it time to die
4521
4522        ctx = multiprocessing.get_context("spawn")
4523        with contextlib.ExitStack() as stack:
4524            if should_die:
4525                stack.enter_context(self.assertWarnsRegex(
4526                    UserWarning,
4527                    "semaphore_tracker: process died"))
4528            sem = ctx.Semaphore()
4529            sem.acquire()
4530            sem.release()
4531            wr = weakref.ref(sem)
4532            # ensure `sem` gets collected, which triggers communication with
4533            # the semaphore tracker
4534            del sem
4535            gc.collect()
4536            self.assertIsNone(wr())
4537
4538    def test_semaphore_tracker_sigint(self):
4539        # Catchable signal (ignored by semaphore tracker)
4540        self.check_semaphore_tracker_death(signal.SIGINT, False)
4541
4542    def test_semaphore_tracker_sigkill(self):
4543        # Uncatchable signal.
4544        self.check_semaphore_tracker_death(signal.SIGKILL, True)
4545
4546
4547class TestSimpleQueue(unittest.TestCase):
4548
4549    @classmethod
4550    def _test_empty(cls, queue, child_can_start, parent_can_continue):
4551        child_can_start.wait()
4552        # issue 30301, could fail under spawn and forkserver
4553        try:
4554            queue.put(queue.empty())
4555            queue.put(queue.empty())
4556        finally:
4557            parent_can_continue.set()
4558
4559    def test_empty(self):
4560        queue = multiprocessing.SimpleQueue()
4561        child_can_start = multiprocessing.Event()
4562        parent_can_continue = multiprocessing.Event()
4563
4564        proc = multiprocessing.Process(
4565            target=self._test_empty,
4566            args=(queue, child_can_start, parent_can_continue)
4567        )
4568        proc.daemon = True
4569        proc.start()
4570
4571        self.assertTrue(queue.empty())
4572
4573        child_can_start.set()
4574        parent_can_continue.wait()
4575
4576        self.assertFalse(queue.empty())
4577        self.assertEqual(queue.get(), True)
4578        self.assertEqual(queue.get(), False)
4579        self.assertTrue(queue.empty())
4580
4581        proc.join()
4582
4583
4584class TestSyncManagerTypes(unittest.TestCase):
4585    """Test all the types which can be shared between a parent and a
4586    child process by using a manager which acts as an intermediary
4587    between them.
4588
4589    In the following unit-tests the base type is created in the parent
4590    process, the @classmethod represents the worker process and the
4591    shared object is readable and editable between the two.
4592
4593    # The child.
4594    @classmethod
4595    def _test_list(cls, obj):
4596        assert obj[0] == 5
4597        assert obj.append(6)
4598
4599    # The parent.
4600    def test_list(self):
4601        o = self.manager.list()
4602        o.append(5)
4603        self.run_worker(self._test_list, o)
4604        assert o[1] == 6
4605    """
4606    manager_class = multiprocessing.managers.SyncManager
4607
4608    def setUp(self):
4609        self.manager = self.manager_class()
4610        self.manager.start()
4611        self.proc = None
4612
4613    def tearDown(self):
4614        if self.proc is not None and self.proc.is_alive():
4615            self.proc.terminate()
4616            self.proc.join()
4617        self.manager.shutdown()
4618        self.manager = None
4619        self.proc = None
4620
4621    @classmethod
4622    def setUpClass(cls):
4623        support.reap_children()
4624
4625    tearDownClass = setUpClass
4626
4627    def wait_proc_exit(self):
4628        # Only the manager process should be returned by active_children()
4629        # but this can take a bit on slow machines, so wait a few seconds
4630        # if there are other children too (see #17395).
4631        join_process(self.proc)
4632        start_time = time.monotonic()
4633        t = 0.01
4634        while len(multiprocessing.active_children()) > 1:
4635            time.sleep(t)
4636            t *= 2
4637            dt = time.monotonic() - start_time
4638            if dt >= 5.0:
4639                test.support.environment_altered = True
4640                print("Warning -- multiprocessing.Manager still has %s active "
4641                      "children after %s seconds"
4642                      % (multiprocessing.active_children(), dt),
4643                      file=sys.stderr)
4644                break
4645
4646    def run_worker(self, worker, obj):
4647        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
4648        self.proc.daemon = True
4649        self.proc.start()
4650        self.wait_proc_exit()
4651        self.assertEqual(self.proc.exitcode, 0)
4652
4653    @classmethod
4654    def _test_queue(cls, obj):
4655        assert obj.qsize() == 2
4656        assert obj.full()
4657        assert not obj.empty()
4658        assert obj.get() == 5
4659        assert not obj.empty()
4660        assert obj.get() == 6
4661        assert obj.empty()
4662
4663    def test_queue(self, qname="Queue"):
4664        o = getattr(self.manager, qname)(2)
4665        o.put(5)
4666        o.put(6)
4667        self.run_worker(self._test_queue, o)
4668        assert o.empty()
4669        assert not o.full()
4670
4671    def test_joinable_queue(self):
4672        self.test_queue("JoinableQueue")
4673
4674    @classmethod
4675    def _test_event(cls, obj):
4676        assert obj.is_set()
4677        obj.wait()
4678        obj.clear()
4679        obj.wait(0.001)
4680
4681    def test_event(self):
4682        o = self.manager.Event()
4683        o.set()
4684        self.run_worker(self._test_event, o)
4685        assert not o.is_set()
4686        o.wait(0.001)
4687
4688    @classmethod
4689    def _test_lock(cls, obj):
4690        obj.acquire()
4691
4692    def test_lock(self, lname="Lock"):
4693        o = getattr(self.manager, lname)()
4694        self.run_worker(self._test_lock, o)
4695        o.release()
4696        self.assertRaises(RuntimeError, o.release)  # already released
4697
4698    @classmethod
4699    def _test_rlock(cls, obj):
4700        obj.acquire()
4701        obj.release()
4702
4703    def test_rlock(self, lname="Lock"):
4704        o = getattr(self.manager, lname)()
4705        self.run_worker(self._test_rlock, o)
4706
4707    @classmethod
4708    def _test_semaphore(cls, obj):
4709        obj.acquire()
4710
4711    def test_semaphore(self, sname="Semaphore"):
4712        o = getattr(self.manager, sname)()
4713        self.run_worker(self._test_semaphore, o)
4714        o.release()
4715
4716    def test_bounded_semaphore(self):
4717        self.test_semaphore(sname="BoundedSemaphore")
4718
4719    @classmethod
4720    def _test_condition(cls, obj):
4721        obj.acquire()
4722        obj.release()
4723
4724    def test_condition(self):
4725        o = self.manager.Condition()
4726        self.run_worker(self._test_condition, o)
4727
4728    @classmethod
4729    def _test_barrier(cls, obj):
4730        assert obj.parties == 5
4731        obj.reset()
4732
4733    def test_barrier(self):
4734        o = self.manager.Barrier(5)
4735        self.run_worker(self._test_barrier, o)
4736
4737    @classmethod
4738    def _test_pool(cls, obj):
4739        # TODO: fix https://bugs.python.org/issue35919
4740        with obj:
4741            pass
4742
4743    def test_pool(self):
4744        o = self.manager.Pool(processes=4)
4745        self.run_worker(self._test_pool, o)
4746
4747    @classmethod
4748    def _test_list(cls, obj):
4749        assert obj[0] == 5
4750        assert obj.count(5) == 1
4751        assert obj.index(5) == 0
4752        obj.sort()
4753        obj.reverse()
4754        for x in obj:
4755            pass
4756        assert len(obj) == 1
4757        assert obj.pop(0) == 5
4758
4759    def test_list(self):
4760        o = self.manager.list()
4761        o.append(5)
4762        self.run_worker(self._test_list, o)
4763        assert not o
4764        self.assertEqual(len(o), 0)
4765
4766    @classmethod
4767    def _test_dict(cls, obj):
4768        assert len(obj) == 1
4769        assert obj['foo'] == 5
4770        assert obj.get('foo') == 5
4771        assert list(obj.items()) == [('foo', 5)]
4772        assert list(obj.keys()) == ['foo']
4773        assert list(obj.values()) == [5]
4774        assert obj.copy() == {'foo': 5}
4775        assert obj.popitem() == ('foo', 5)
4776
4777    def test_dict(self):
4778        o = self.manager.dict()
4779        o['foo'] = 5
4780        self.run_worker(self._test_dict, o)
4781        assert not o
4782        self.assertEqual(len(o), 0)
4783
4784    @classmethod
4785    def _test_value(cls, obj):
4786        assert obj.value == 1
4787        assert obj.get() == 1
4788        obj.set(2)
4789
4790    def test_value(self):
4791        o = self.manager.Value('i', 1)
4792        self.run_worker(self._test_value, o)
4793        self.assertEqual(o.value, 2)
4794        self.assertEqual(o.get(), 2)
4795
4796    @classmethod
4797    def _test_array(cls, obj):
4798        assert obj[0] == 0
4799        assert obj[1] == 1
4800        assert len(obj) == 2
4801        assert list(obj) == [0, 1]
4802
4803    def test_array(self):
4804        o = self.manager.Array('i', [0, 1])
4805        self.run_worker(self._test_array, o)
4806
4807    @classmethod
4808    def _test_namespace(cls, obj):
4809        assert obj.x == 0
4810        assert obj.y == 1
4811
4812    def test_namespace(self):
4813        o = self.manager.Namespace()
4814        o.x = 0
4815        o.y = 1
4816        self.run_worker(self._test_namespace, o)
4817
4818
4819#
4820# Mixins
4821#
4822
4823class BaseMixin(object):
4824    @classmethod
4825    def setUpClass(cls):
4826        cls.dangling = (multiprocessing.process._dangling.copy(),
4827                        threading._dangling.copy())
4828
4829    @classmethod
4830    def tearDownClass(cls):
4831        # bpo-26762: Some multiprocessing objects like Pool create reference
4832        # cycles. Trigger a garbage collection to break these cycles.
4833        test.support.gc_collect()
4834
4835        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
4836        if processes:
4837            test.support.environment_altered = True
4838            print('Warning -- Dangling processes: %s' % processes,
4839                  file=sys.stderr)
4840        processes = None
4841
4842        threads = set(threading._dangling) - set(cls.dangling[1])
4843        if threads:
4844            test.support.environment_altered = True
4845            print('Warning -- Dangling threads: %s' % threads,
4846                  file=sys.stderr)
4847        threads = None
4848
4849
4850class ProcessesMixin(BaseMixin):
4851    TYPE = 'processes'
4852    Process = multiprocessing.Process
4853    connection = multiprocessing.connection
4854    current_process = staticmethod(multiprocessing.current_process)
4855    active_children = staticmethod(multiprocessing.active_children)
4856    Pool = staticmethod(multiprocessing.Pool)
4857    Pipe = staticmethod(multiprocessing.Pipe)
4858    Queue = staticmethod(multiprocessing.Queue)
4859    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
4860    Lock = staticmethod(multiprocessing.Lock)
4861    RLock = staticmethod(multiprocessing.RLock)
4862    Semaphore = staticmethod(multiprocessing.Semaphore)
4863    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
4864    Condition = staticmethod(multiprocessing.Condition)
4865    Event = staticmethod(multiprocessing.Event)
4866    Barrier = staticmethod(multiprocessing.Barrier)
4867    Value = staticmethod(multiprocessing.Value)
4868    Array = staticmethod(multiprocessing.Array)
4869    RawValue = staticmethod(multiprocessing.RawValue)
4870    RawArray = staticmethod(multiprocessing.RawArray)
4871
4872
4873class ManagerMixin(BaseMixin):
4874    TYPE = 'manager'
4875    Process = multiprocessing.Process
4876    Queue = property(operator.attrgetter('manager.Queue'))
4877    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
4878    Lock = property(operator.attrgetter('manager.Lock'))
4879    RLock = property(operator.attrgetter('manager.RLock'))
4880    Semaphore = property(operator.attrgetter('manager.Semaphore'))
4881    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
4882    Condition = property(operator.attrgetter('manager.Condition'))
4883    Event = property(operator.attrgetter('manager.Event'))
4884    Barrier = property(operator.attrgetter('manager.Barrier'))
4885    Value = property(operator.attrgetter('manager.Value'))
4886    Array = property(operator.attrgetter('manager.Array'))
4887    list = property(operator.attrgetter('manager.list'))
4888    dict = property(operator.attrgetter('manager.dict'))
4889    Namespace = property(operator.attrgetter('manager.Namespace'))
4890
4891    @classmethod
4892    def Pool(cls, *args, **kwds):
4893        return cls.manager.Pool(*args, **kwds)
4894
4895    @classmethod
4896    def setUpClass(cls):
4897        super().setUpClass()
4898        cls.manager = multiprocessing.Manager()
4899
4900    @classmethod
4901    def tearDownClass(cls):
4902        # only the manager process should be returned by active_children()
4903        # but this can take a bit on slow machines, so wait a few seconds
4904        # if there are other children too (see #17395)
4905        start_time = time.monotonic()
4906        t = 0.01
4907        while len(multiprocessing.active_children()) > 1:
4908            time.sleep(t)
4909            t *= 2
4910            dt = time.monotonic() - start_time
4911            if dt >= 5.0:
4912                test.support.environment_altered = True
4913                print("Warning -- multiprocessing.Manager still has %s active "
4914                      "children after %s seconds"
4915                      % (multiprocessing.active_children(), dt),
4916                      file=sys.stderr)
4917                break
4918
4919        gc.collect()                       # do garbage collection
4920        if cls.manager._number_of_objects() != 0:
4921            # This is not really an error since some tests do not
4922            # ensure that all processes which hold a reference to a
4923            # managed object have been joined.
4924            test.support.environment_altered = True
4925            print('Warning -- Shared objects which still exist at manager '
4926                  'shutdown:')
4927            print(cls.manager._debug_info())
4928        cls.manager.shutdown()
4929        cls.manager.join()
4930        cls.manager = None
4931
4932        super().tearDownClass()
4933
4934
4935class ThreadsMixin(BaseMixin):
4936    TYPE = 'threads'
4937    Process = multiprocessing.dummy.Process
4938    connection = multiprocessing.dummy.connection
4939    current_process = staticmethod(multiprocessing.dummy.current_process)
4940    active_children = staticmethod(multiprocessing.dummy.active_children)
4941    Pool = staticmethod(multiprocessing.dummy.Pool)
4942    Pipe = staticmethod(multiprocessing.dummy.Pipe)
4943    Queue = staticmethod(multiprocessing.dummy.Queue)
4944    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
4945    Lock = staticmethod(multiprocessing.dummy.Lock)
4946    RLock = staticmethod(multiprocessing.dummy.RLock)
4947    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
4948    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
4949    Condition = staticmethod(multiprocessing.dummy.Condition)
4950    Event = staticmethod(multiprocessing.dummy.Event)
4951    Barrier = staticmethod(multiprocessing.dummy.Barrier)
4952    Value = staticmethod(multiprocessing.dummy.Value)
4953    Array = staticmethod(multiprocessing.dummy.Array)
4954
4955#
4956# Functions used to create test cases from the base ones in this module
4957#
4958
4959def install_tests_in_module_dict(remote_globs, start_method):
4960    __module__ = remote_globs['__name__']
4961    local_globs = globals()
4962    ALL_TYPES = {'processes', 'threads', 'manager'}
4963
4964    for name, base in local_globs.items():
4965        if not isinstance(base, type):
4966            continue
4967        if issubclass(base, BaseTestCase):
4968            if base is BaseTestCase:
4969                continue
4970            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
4971            for type_ in base.ALLOWED_TYPES:
4972                newname = 'With' + type_.capitalize() + name[1:]
4973                Mixin = local_globs[type_.capitalize() + 'Mixin']
4974                class Temp(base, Mixin, unittest.TestCase):
4975                    pass
4976                Temp.__name__ = Temp.__qualname__ = newname
4977                Temp.__module__ = __module__
4978                remote_globs[newname] = Temp
4979        elif issubclass(base, unittest.TestCase):
4980            class Temp(base, object):
4981                pass
4982            Temp.__name__ = Temp.__qualname__ = name
4983            Temp.__module__ = __module__
4984            remote_globs[name] = Temp
4985
4986    dangling = [None, None]
4987    old_start_method = [None]
4988
4989    def setUpModule():
4990        multiprocessing.set_forkserver_preload(PRELOAD)
4991        multiprocessing.process._cleanup()
4992        dangling[0] = multiprocessing.process._dangling.copy()
4993        dangling[1] = threading._dangling.copy()
4994        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
4995        try:
4996            multiprocessing.set_start_method(start_method, force=True)
4997        except ValueError:
4998            raise unittest.SkipTest(start_method +
4999                                    ' start method not supported')
5000
5001        if sys.platform.startswith("linux"):
5002            try:
5003                lock = multiprocessing.RLock()
5004            except OSError:
5005                raise unittest.SkipTest("OSError raises on RLock creation, "
5006                                        "see issue 3111!")
5007        check_enough_semaphores()
5008        util.get_temp_dir()     # creates temp directory
5009        multiprocessing.get_logger().setLevel(LOG_LEVEL)
5010
5011    def tearDownModule():
5012        need_sleep = False
5013
5014        # bpo-26762: Some multiprocessing objects like Pool create reference
5015        # cycles. Trigger a garbage collection to break these cycles.
5016        test.support.gc_collect()
5017
5018        multiprocessing.set_start_method(old_start_method[0], force=True)
5019        # pause a bit so we don't get warning about dangling threads/processes
5020        processes = set(multiprocessing.process._dangling) - set(dangling[0])
5021        if processes:
5022            need_sleep = True
5023            test.support.environment_altered = True
5024            print('Warning -- Dangling processes: %s' % processes,
5025                  file=sys.stderr)
5026        processes = None
5027
5028        threads = set(threading._dangling) - set(dangling[1])
5029        if threads:
5030            need_sleep = True
5031            test.support.environment_altered = True
5032            print('Warning -- Dangling threads: %s' % threads,
5033                  file=sys.stderr)
5034        threads = None
5035
5036        # Sleep 500 ms to give time to child processes to complete.
5037        if need_sleep:
5038            time.sleep(0.5)
5039        multiprocessing.process._cleanup()
5040        test.support.gc_collect()
5041
5042    remote_globs['setUpModule'] = setUpModule
5043    remote_globs['tearDownModule'] = tearDownModule
5044