• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import unittest.mock
7import queue as pyqueue
8import time
9import io
10import itertools
11import sys
12import os
13import gc
14import errno
15import signal
16import array
17import socket
18import random
19import logging
20import subprocess
21import struct
22import operator
23import pickle
24import weakref
25import warnings
26import test.support
27import test.support.script_helper
28from test import support
29
30
31# Skip tests if _multiprocessing wasn't built.
32_multiprocessing = test.support.import_module('_multiprocessing')
33# Skip tests if sem_open implementation is broken.
34test.support.import_module('multiprocessing.synchronize')
35import threading
36
37import multiprocessing.connection
38import multiprocessing.dummy
39import multiprocessing.heap
40import multiprocessing.managers
41import multiprocessing.pool
42import multiprocessing.queues
43
44from multiprocessing import util
45
46try:
47    from multiprocessing import reduction
48    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
49except ImportError:
50    HAS_REDUCTION = False
51
52try:
53    from multiprocessing.sharedctypes import Value, copy
54    HAS_SHAREDCTYPES = True
55except ImportError:
56    HAS_SHAREDCTYPES = False
57
58try:
59    from multiprocessing import shared_memory
60    HAS_SHMEM = True
61except ImportError:
62    HAS_SHMEM = False
63
64try:
65    import msvcrt
66except ImportError:
67    msvcrt = None
68
69#
70#
71#
72
73# Timeout to wait until a process completes
74TIMEOUT = 60.0 # seconds
75
76def latin(s):
77    return s.encode('latin')
78
79
80def close_queue(queue):
81    if isinstance(queue, multiprocessing.queues.Queue):
82        queue.close()
83        queue.join_thread()
84
85
86def join_process(process):
87    # Since multiprocessing.Process has the same API than threading.Thread
88    # (join() and is_alive(), the support function can be reused
89    support.join_thread(process, timeout=TIMEOUT)
90
91
92if os.name == "posix":
93    from multiprocessing import resource_tracker
94
95    def _resource_unlink(name, rtype):
96        resource_tracker._CLEANUP_FUNCS[rtype](name)
97
98
99#
100# Constants
101#
102
103LOG_LEVEL = util.SUBWARNING
104#LOG_LEVEL = logging.DEBUG
105
106DELTA = 0.1
107CHECK_TIMINGS = False     # making true makes tests take a lot longer
108                          # and can sometimes cause some non-serious
109                          # failures because some calls block a bit
110                          # longer than expected
111if CHECK_TIMINGS:
112    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
113else:
114    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
115
116HAVE_GETVALUE = not getattr(_multiprocessing,
117                            'HAVE_BROKEN_SEM_GETVALUE', False)
118
119WIN32 = (sys.platform == "win32")
120
121from multiprocessing.connection import wait
122
123def wait_for_handle(handle, timeout):
124    if timeout is not None and timeout < 0.0:
125        timeout = None
126    return wait([handle], timeout)
127
128try:
129    MAXFD = os.sysconf("SC_OPEN_MAX")
130except:
131    MAXFD = 256
132
133# To speed up tests when using the forkserver, we can preload these:
134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
135
136#
137# Some tests require ctypes
138#
139
140try:
141    from ctypes import Structure, c_int, c_double, c_longlong
142except ImportError:
143    Structure = object
144    c_int = c_double = c_longlong = None
145
146
147def check_enough_semaphores():
148    """Check that the system supports enough semaphores to run the test."""
149    # minimum number of semaphores available according to POSIX
150    nsems_min = 256
151    try:
152        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
153    except (AttributeError, ValueError):
154        # sysconf not available or setting not available
155        return
156    if nsems == -1 or nsems >= nsems_min:
157        return
158    raise unittest.SkipTest("The OS doesn't support enough semaphores "
159                            "to run the test (required: %d)." % nsems_min)
160
161
162#
163# Creates a wrapper for a function which records the time it takes to finish
164#
165
166class TimingWrapper(object):
167
168    def __init__(self, func):
169        self.func = func
170        self.elapsed = None
171
172    def __call__(self, *args, **kwds):
173        t = time.monotonic()
174        try:
175            return self.func(*args, **kwds)
176        finally:
177            self.elapsed = time.monotonic() - t
178
179#
180# Base class for test cases
181#
182
183class BaseTestCase(object):
184
185    ALLOWED_TYPES = ('processes', 'manager', 'threads')
186
187    def assertTimingAlmostEqual(self, a, b):
188        if CHECK_TIMINGS:
189            self.assertAlmostEqual(a, b, 1)
190
191    def assertReturnsIfImplemented(self, value, func, *args):
192        try:
193            res = func(*args)
194        except NotImplementedError:
195            pass
196        else:
197            return self.assertEqual(value, res)
198
199    # For the sanity of Windows users, rather than crashing or freezing in
200    # multiple ways.
201    def __reduce__(self, *args):
202        raise NotImplementedError("shouldn't try to pickle a test case")
203
204    __reduce_ex__ = __reduce__
205
206#
207# Return the value of a semaphore
208#
209
210def get_value(self):
211    try:
212        return self.get_value()
213    except AttributeError:
214        try:
215            return self._Semaphore__value
216        except AttributeError:
217            try:
218                return self._value
219            except AttributeError:
220                raise NotImplementedError
221
222#
223# Testcases
224#
225
226class DummyCallable:
227    def __call__(self, q, c):
228        assert isinstance(c, DummyCallable)
229        q.put(5)
230
231
232class _TestProcess(BaseTestCase):
233
234    ALLOWED_TYPES = ('processes', 'threads')
235
236    def test_current(self):
237        if self.TYPE == 'threads':
238            self.skipTest('test not appropriate for {}'.format(self.TYPE))
239
240        current = self.current_process()
241        authkey = current.authkey
242
243        self.assertTrue(current.is_alive())
244        self.assertTrue(not current.daemon)
245        self.assertIsInstance(authkey, bytes)
246        self.assertTrue(len(authkey) > 0)
247        self.assertEqual(current.ident, os.getpid())
248        self.assertEqual(current.exitcode, None)
249
250    def test_daemon_argument(self):
251        if self.TYPE == "threads":
252            self.skipTest('test not appropriate for {}'.format(self.TYPE))
253
254        # By default uses the current process's daemon flag.
255        proc0 = self.Process(target=self._test)
256        self.assertEqual(proc0.daemon, self.current_process().daemon)
257        proc1 = self.Process(target=self._test, daemon=True)
258        self.assertTrue(proc1.daemon)
259        proc2 = self.Process(target=self._test, daemon=False)
260        self.assertFalse(proc2.daemon)
261
262    @classmethod
263    def _test(cls, q, *args, **kwds):
264        current = cls.current_process()
265        q.put(args)
266        q.put(kwds)
267        q.put(current.name)
268        if cls.TYPE != 'threads':
269            q.put(bytes(current.authkey))
270            q.put(current.pid)
271
272    def test_parent_process_attributes(self):
273        if self.TYPE == "threads":
274            self.skipTest('test not appropriate for {}'.format(self.TYPE))
275
276        self.assertIsNone(self.parent_process())
277
278        rconn, wconn = self.Pipe(duplex=False)
279        p = self.Process(target=self._test_send_parent_process, args=(wconn,))
280        p.start()
281        p.join()
282        parent_pid, parent_name = rconn.recv()
283        self.assertEqual(parent_pid, self.current_process().pid)
284        self.assertEqual(parent_pid, os.getpid())
285        self.assertEqual(parent_name, self.current_process().name)
286
287    @classmethod
288    def _test_send_parent_process(cls, wconn):
289        from multiprocessing.process import parent_process
290        wconn.send([parent_process().pid, parent_process().name])
291
292    def test_parent_process(self):
293        if self.TYPE == "threads":
294            self.skipTest('test not appropriate for {}'.format(self.TYPE))
295
296        # Launch a child process. Make it launch a grandchild process. Kill the
297        # child process and make sure that the grandchild notices the death of
298        # its parent (a.k.a the child process).
299        rconn, wconn = self.Pipe(duplex=False)
300        p = self.Process(
301            target=self._test_create_grandchild_process, args=(wconn, ))
302        p.start()
303
304        if not rconn.poll(timeout=60):
305            raise AssertionError("Could not communicate with child process")
306        parent_process_status = rconn.recv()
307        self.assertEqual(parent_process_status, "alive")
308
309        p.terminate()
310        p.join()
311
312        if not rconn.poll(timeout=60):
313            raise AssertionError("Could not communicate with child process")
314        parent_process_status = rconn.recv()
315        self.assertEqual(parent_process_status, "not alive")
316
317    @classmethod
318    def _test_create_grandchild_process(cls, wconn):
319        p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
320        p.start()
321        time.sleep(300)
322
323    @classmethod
324    def _test_report_parent_status(cls, wconn):
325        from multiprocessing.process import parent_process
326        wconn.send("alive" if parent_process().is_alive() else "not alive")
327        parent_process().join(timeout=5)
328        wconn.send("alive" if parent_process().is_alive() else "not alive")
329
330    def test_process(self):
331        q = self.Queue(1)
332        e = self.Event()
333        args = (q, 1, 2)
334        kwargs = {'hello':23, 'bye':2.54}
335        name = 'SomeProcess'
336        p = self.Process(
337            target=self._test, args=args, kwargs=kwargs, name=name
338            )
339        p.daemon = True
340        current = self.current_process()
341
342        if self.TYPE != 'threads':
343            self.assertEqual(p.authkey, current.authkey)
344        self.assertEqual(p.is_alive(), False)
345        self.assertEqual(p.daemon, True)
346        self.assertNotIn(p, self.active_children())
347        self.assertTrue(type(self.active_children()) is list)
348        self.assertEqual(p.exitcode, None)
349
350        p.start()
351
352        self.assertEqual(p.exitcode, None)
353        self.assertEqual(p.is_alive(), True)
354        self.assertIn(p, self.active_children())
355
356        self.assertEqual(q.get(), args[1:])
357        self.assertEqual(q.get(), kwargs)
358        self.assertEqual(q.get(), p.name)
359        if self.TYPE != 'threads':
360            self.assertEqual(q.get(), current.authkey)
361            self.assertEqual(q.get(), p.pid)
362
363        p.join()
364
365        self.assertEqual(p.exitcode, 0)
366        self.assertEqual(p.is_alive(), False)
367        self.assertNotIn(p, self.active_children())
368        close_queue(q)
369
370    @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
371    def test_process_mainthread_native_id(self):
372        if self.TYPE == 'threads':
373            self.skipTest('test not appropriate for {}'.format(self.TYPE))
374
375        current_mainthread_native_id = threading.main_thread().native_id
376
377        q = self.Queue(1)
378        p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
379        p.start()
380
381        child_mainthread_native_id = q.get()
382        p.join()
383        close_queue(q)
384
385        self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
386
387    @classmethod
388    def _test_process_mainthread_native_id(cls, q):
389        mainthread_native_id = threading.main_thread().native_id
390        q.put(mainthread_native_id)
391
392    @classmethod
393    def _sleep_some(cls):
394        time.sleep(100)
395
396    @classmethod
397    def _test_sleep(cls, delay):
398        time.sleep(delay)
399
400    def _kill_process(self, meth):
401        if self.TYPE == 'threads':
402            self.skipTest('test not appropriate for {}'.format(self.TYPE))
403
404        p = self.Process(target=self._sleep_some)
405        p.daemon = True
406        p.start()
407
408        self.assertEqual(p.is_alive(), True)
409        self.assertIn(p, self.active_children())
410        self.assertEqual(p.exitcode, None)
411
412        join = TimingWrapper(p.join)
413
414        self.assertEqual(join(0), None)
415        self.assertTimingAlmostEqual(join.elapsed, 0.0)
416        self.assertEqual(p.is_alive(), True)
417
418        self.assertEqual(join(-1), None)
419        self.assertTimingAlmostEqual(join.elapsed, 0.0)
420        self.assertEqual(p.is_alive(), True)
421
422        # XXX maybe terminating too soon causes the problems on Gentoo...
423        time.sleep(1)
424
425        meth(p)
426
427        if hasattr(signal, 'alarm'):
428            # On the Gentoo buildbot waitpid() often seems to block forever.
429            # We use alarm() to interrupt it if it blocks for too long.
430            def handler(*args):
431                raise RuntimeError('join took too long: %s' % p)
432            old_handler = signal.signal(signal.SIGALRM, handler)
433            try:
434                signal.alarm(10)
435                self.assertEqual(join(), None)
436            finally:
437                signal.alarm(0)
438                signal.signal(signal.SIGALRM, old_handler)
439        else:
440            self.assertEqual(join(), None)
441
442        self.assertTimingAlmostEqual(join.elapsed, 0.0)
443
444        self.assertEqual(p.is_alive(), False)
445        self.assertNotIn(p, self.active_children())
446
447        p.join()
448
449        return p.exitcode
450
451    def test_terminate(self):
452        exitcode = self._kill_process(multiprocessing.Process.terminate)
453        if os.name != 'nt':
454            self.assertEqual(exitcode, -signal.SIGTERM)
455
456    def test_kill(self):
457        exitcode = self._kill_process(multiprocessing.Process.kill)
458        if os.name != 'nt':
459            self.assertEqual(exitcode, -signal.SIGKILL)
460
461    def test_cpu_count(self):
462        try:
463            cpus = multiprocessing.cpu_count()
464        except NotImplementedError:
465            cpus = 1
466        self.assertTrue(type(cpus) is int)
467        self.assertTrue(cpus >= 1)
468
469    def test_active_children(self):
470        self.assertEqual(type(self.active_children()), list)
471
472        p = self.Process(target=time.sleep, args=(DELTA,))
473        self.assertNotIn(p, self.active_children())
474
475        p.daemon = True
476        p.start()
477        self.assertIn(p, self.active_children())
478
479        p.join()
480        self.assertNotIn(p, self.active_children())
481
482    @classmethod
483    def _test_recursion(cls, wconn, id):
484        wconn.send(id)
485        if len(id) < 2:
486            for i in range(2):
487                p = cls.Process(
488                    target=cls._test_recursion, args=(wconn, id+[i])
489                    )
490                p.start()
491                p.join()
492
493    def test_recursion(self):
494        rconn, wconn = self.Pipe(duplex=False)
495        self._test_recursion(wconn, [])
496
497        time.sleep(DELTA)
498        result = []
499        while rconn.poll():
500            result.append(rconn.recv())
501
502        expected = [
503            [],
504              [0],
505                [0, 0],
506                [0, 1],
507              [1],
508                [1, 0],
509                [1, 1]
510            ]
511        self.assertEqual(result, expected)
512
513    @classmethod
514    def _test_sentinel(cls, event):
515        event.wait(10.0)
516
517    def test_sentinel(self):
518        if self.TYPE == "threads":
519            self.skipTest('test not appropriate for {}'.format(self.TYPE))
520        event = self.Event()
521        p = self.Process(target=self._test_sentinel, args=(event,))
522        with self.assertRaises(ValueError):
523            p.sentinel
524        p.start()
525        self.addCleanup(p.join)
526        sentinel = p.sentinel
527        self.assertIsInstance(sentinel, int)
528        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
529        event.set()
530        p.join()
531        self.assertTrue(wait_for_handle(sentinel, timeout=1))
532
533    @classmethod
534    def _test_close(cls, rc=0, q=None):
535        if q is not None:
536            q.get()
537        sys.exit(rc)
538
539    def test_close(self):
540        if self.TYPE == "threads":
541            self.skipTest('test not appropriate for {}'.format(self.TYPE))
542        q = self.Queue()
543        p = self.Process(target=self._test_close, kwargs={'q': q})
544        p.daemon = True
545        p.start()
546        self.assertEqual(p.is_alive(), True)
547        # Child is still alive, cannot close
548        with self.assertRaises(ValueError):
549            p.close()
550
551        q.put(None)
552        p.join()
553        self.assertEqual(p.is_alive(), False)
554        self.assertEqual(p.exitcode, 0)
555        p.close()
556        with self.assertRaises(ValueError):
557            p.is_alive()
558        with self.assertRaises(ValueError):
559            p.join()
560        with self.assertRaises(ValueError):
561            p.terminate()
562        p.close()
563
564        wr = weakref.ref(p)
565        del p
566        gc.collect()
567        self.assertIs(wr(), None)
568
569        close_queue(q)
570
571    def test_many_processes(self):
572        if self.TYPE == 'threads':
573            self.skipTest('test not appropriate for {}'.format(self.TYPE))
574
575        sm = multiprocessing.get_start_method()
576        N = 5 if sm == 'spawn' else 100
577
578        # Try to overwhelm the forkserver loop with events
579        procs = [self.Process(target=self._test_sleep, args=(0.01,))
580                 for i in range(N)]
581        for p in procs:
582            p.start()
583        for p in procs:
584            join_process(p)
585        for p in procs:
586            self.assertEqual(p.exitcode, 0)
587
588        procs = [self.Process(target=self._sleep_some)
589                 for i in range(N)]
590        for p in procs:
591            p.start()
592        time.sleep(0.001)  # let the children start...
593        for p in procs:
594            p.terminate()
595        for p in procs:
596            join_process(p)
597        if os.name != 'nt':
598            exitcodes = [-signal.SIGTERM]
599            if sys.platform == 'darwin':
600                # bpo-31510: On macOS, killing a freshly started process with
601                # SIGTERM sometimes kills the process with SIGKILL.
602                exitcodes.append(-signal.SIGKILL)
603            for p in procs:
604                self.assertIn(p.exitcode, exitcodes)
605
606    def test_lose_target_ref(self):
607        c = DummyCallable()
608        wr = weakref.ref(c)
609        q = self.Queue()
610        p = self.Process(target=c, args=(q, c))
611        del c
612        p.start()
613        p.join()
614        self.assertIs(wr(), None)
615        self.assertEqual(q.get(), 5)
616        close_queue(q)
617
618    @classmethod
619    def _test_child_fd_inflation(self, evt, q):
620        q.put(test.support.fd_count())
621        evt.wait()
622
623    def test_child_fd_inflation(self):
624        # Number of fds in child processes should not grow with the
625        # number of running children.
626        if self.TYPE == 'threads':
627            self.skipTest('test not appropriate for {}'.format(self.TYPE))
628
629        sm = multiprocessing.get_start_method()
630        if sm == 'fork':
631            # The fork method by design inherits all fds from the parent,
632            # trying to go against it is a lost battle
633            self.skipTest('test not appropriate for {}'.format(sm))
634
635        N = 5
636        evt = self.Event()
637        q = self.Queue()
638
639        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
640                 for i in range(N)]
641        for p in procs:
642            p.start()
643
644        try:
645            fd_counts = [q.get() for i in range(N)]
646            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
647
648        finally:
649            evt.set()
650            for p in procs:
651                p.join()
652            close_queue(q)
653
654    @classmethod
655    def _test_wait_for_threads(self, evt):
656        def func1():
657            time.sleep(0.5)
658            evt.set()
659
660        def func2():
661            time.sleep(20)
662            evt.clear()
663
664        threading.Thread(target=func1).start()
665        threading.Thread(target=func2, daemon=True).start()
666
667    def test_wait_for_threads(self):
668        # A child process should wait for non-daemonic threads to end
669        # before exiting
670        if self.TYPE == 'threads':
671            self.skipTest('test not appropriate for {}'.format(self.TYPE))
672
673        evt = self.Event()
674        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
675        proc.start()
676        proc.join()
677        self.assertTrue(evt.is_set())
678
679    @classmethod
680    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
681        for stream_name, action in break_std_streams.items():
682            if action == 'close':
683                stream = io.StringIO()
684                stream.close()
685            else:
686                assert action == 'remove'
687                stream = None
688            setattr(sys, stream_name, None)
689        evt.set()
690
691    def test_error_on_stdio_flush_1(self):
692        # Check that Process works with broken standard streams
693        streams = [io.StringIO(), None]
694        streams[0].close()
695        for stream_name in ('stdout', 'stderr'):
696            for stream in streams:
697                old_stream = getattr(sys, stream_name)
698                setattr(sys, stream_name, stream)
699                try:
700                    evt = self.Event()
701                    proc = self.Process(target=self._test_error_on_stdio_flush,
702                                        args=(evt,))
703                    proc.start()
704                    proc.join()
705                    self.assertTrue(evt.is_set())
706                    self.assertEqual(proc.exitcode, 0)
707                finally:
708                    setattr(sys, stream_name, old_stream)
709
710    def test_error_on_stdio_flush_2(self):
711        # Same as test_error_on_stdio_flush_1(), but standard streams are
712        # broken by the child process
713        for stream_name in ('stdout', 'stderr'):
714            for action in ('close', 'remove'):
715                old_stream = getattr(sys, stream_name)
716                try:
717                    evt = self.Event()
718                    proc = self.Process(target=self._test_error_on_stdio_flush,
719                                        args=(evt, {stream_name: action}))
720                    proc.start()
721                    proc.join()
722                    self.assertTrue(evt.is_set())
723                    self.assertEqual(proc.exitcode, 0)
724                finally:
725                    setattr(sys, stream_name, old_stream)
726
727    @classmethod
728    def _sleep_and_set_event(self, evt, delay=0.0):
729        time.sleep(delay)
730        evt.set()
731
732    def check_forkserver_death(self, signum):
733        # bpo-31308: if the forkserver process has died, we should still
734        # be able to create and run new Process instances (the forkserver
735        # is implicitly restarted).
736        if self.TYPE == 'threads':
737            self.skipTest('test not appropriate for {}'.format(self.TYPE))
738        sm = multiprocessing.get_start_method()
739        if sm != 'forkserver':
740            # The fork method by design inherits all fds from the parent,
741            # trying to go against it is a lost battle
742            self.skipTest('test not appropriate for {}'.format(sm))
743
744        from multiprocessing.forkserver import _forkserver
745        _forkserver.ensure_running()
746
747        # First process sleeps 500 ms
748        delay = 0.5
749
750        evt = self.Event()
751        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
752        proc.start()
753
754        pid = _forkserver._forkserver_pid
755        os.kill(pid, signum)
756        # give time to the fork server to die and time to proc to complete
757        time.sleep(delay * 2.0)
758
759        evt2 = self.Event()
760        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
761        proc2.start()
762        proc2.join()
763        self.assertTrue(evt2.is_set())
764        self.assertEqual(proc2.exitcode, 0)
765
766        proc.join()
767        self.assertTrue(evt.is_set())
768        self.assertIn(proc.exitcode, (0, 255))
769
770    def test_forkserver_sigint(self):
771        # Catchable signal
772        self.check_forkserver_death(signal.SIGINT)
773
774    def test_forkserver_sigkill(self):
775        # Uncatchable signal
776        if os.name != 'nt':
777            self.check_forkserver_death(signal.SIGKILL)
778
779
780#
781#
782#
783
784class _UpperCaser(multiprocessing.Process):
785
786    def __init__(self):
787        multiprocessing.Process.__init__(self)
788        self.child_conn, self.parent_conn = multiprocessing.Pipe()
789
790    def run(self):
791        self.parent_conn.close()
792        for s in iter(self.child_conn.recv, None):
793            self.child_conn.send(s.upper())
794        self.child_conn.close()
795
796    def submit(self, s):
797        assert type(s) is str
798        self.parent_conn.send(s)
799        return self.parent_conn.recv()
800
801    def stop(self):
802        self.parent_conn.send(None)
803        self.parent_conn.close()
804        self.child_conn.close()
805
806class _TestSubclassingProcess(BaseTestCase):
807
808    ALLOWED_TYPES = ('processes',)
809
810    def test_subclassing(self):
811        uppercaser = _UpperCaser()
812        uppercaser.daemon = True
813        uppercaser.start()
814        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
815        self.assertEqual(uppercaser.submit('world'), 'WORLD')
816        uppercaser.stop()
817        uppercaser.join()
818
819    def test_stderr_flush(self):
820        # sys.stderr is flushed at process shutdown (issue #13812)
821        if self.TYPE == "threads":
822            self.skipTest('test not appropriate for {}'.format(self.TYPE))
823
824        testfn = test.support.TESTFN
825        self.addCleanup(test.support.unlink, testfn)
826        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
827        proc.start()
828        proc.join()
829        with open(testfn, 'r') as f:
830            err = f.read()
831            # The whole traceback was printed
832            self.assertIn("ZeroDivisionError", err)
833            self.assertIn("test_multiprocessing.py", err)
834            self.assertIn("1/0 # MARKER", err)
835
836    @classmethod
837    def _test_stderr_flush(cls, testfn):
838        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
839        sys.stderr = open(fd, 'w', closefd=False)
840        1/0 # MARKER
841
842
843    @classmethod
844    def _test_sys_exit(cls, reason, testfn):
845        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
846        sys.stderr = open(fd, 'w', closefd=False)
847        sys.exit(reason)
848
849    def test_sys_exit(self):
850        # See Issue 13854
851        if self.TYPE == 'threads':
852            self.skipTest('test not appropriate for {}'.format(self.TYPE))
853
854        testfn = test.support.TESTFN
855        self.addCleanup(test.support.unlink, testfn)
856
857        for reason in (
858            [1, 2, 3],
859            'ignore this',
860        ):
861            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
862            p.daemon = True
863            p.start()
864            join_process(p)
865            self.assertEqual(p.exitcode, 1)
866
867            with open(testfn, 'r') as f:
868                content = f.read()
869            self.assertEqual(content.rstrip(), str(reason))
870
871            os.unlink(testfn)
872
873        for reason in (True, False, 8):
874            p = self.Process(target=sys.exit, args=(reason,))
875            p.daemon = True
876            p.start()
877            join_process(p)
878            self.assertEqual(p.exitcode, reason)
879
880#
881#
882#
883
884def queue_empty(q):
885    if hasattr(q, 'empty'):
886        return q.empty()
887    else:
888        return q.qsize() == 0
889
890def queue_full(q, maxsize):
891    if hasattr(q, 'full'):
892        return q.full()
893    else:
894        return q.qsize() == maxsize
895
896
897class _TestQueue(BaseTestCase):
898
899
900    @classmethod
901    def _test_put(cls, queue, child_can_start, parent_can_continue):
902        child_can_start.wait()
903        for i in range(6):
904            queue.get()
905        parent_can_continue.set()
906
907    def test_put(self):
908        MAXSIZE = 6
909        queue = self.Queue(maxsize=MAXSIZE)
910        child_can_start = self.Event()
911        parent_can_continue = self.Event()
912
913        proc = self.Process(
914            target=self._test_put,
915            args=(queue, child_can_start, parent_can_continue)
916            )
917        proc.daemon = True
918        proc.start()
919
920        self.assertEqual(queue_empty(queue), True)
921        self.assertEqual(queue_full(queue, MAXSIZE), False)
922
923        queue.put(1)
924        queue.put(2, True)
925        queue.put(3, True, None)
926        queue.put(4, False)
927        queue.put(5, False, None)
928        queue.put_nowait(6)
929
930        # the values may be in buffer but not yet in pipe so sleep a bit
931        time.sleep(DELTA)
932
933        self.assertEqual(queue_empty(queue), False)
934        self.assertEqual(queue_full(queue, MAXSIZE), True)
935
936        put = TimingWrapper(queue.put)
937        put_nowait = TimingWrapper(queue.put_nowait)
938
939        self.assertRaises(pyqueue.Full, put, 7, False)
940        self.assertTimingAlmostEqual(put.elapsed, 0)
941
942        self.assertRaises(pyqueue.Full, put, 7, False, None)
943        self.assertTimingAlmostEqual(put.elapsed, 0)
944
945        self.assertRaises(pyqueue.Full, put_nowait, 7)
946        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
947
948        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
949        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
950
951        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
952        self.assertTimingAlmostEqual(put.elapsed, 0)
953
954        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
955        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
956
957        child_can_start.set()
958        parent_can_continue.wait()
959
960        self.assertEqual(queue_empty(queue), True)
961        self.assertEqual(queue_full(queue, MAXSIZE), False)
962
963        proc.join()
964        close_queue(queue)
965
966    @classmethod
967    def _test_get(cls, queue, child_can_start, parent_can_continue):
968        child_can_start.wait()
969        #queue.put(1)
970        queue.put(2)
971        queue.put(3)
972        queue.put(4)
973        queue.put(5)
974        parent_can_continue.set()
975
976    def test_get(self):
977        queue = self.Queue()
978        child_can_start = self.Event()
979        parent_can_continue = self.Event()
980
981        proc = self.Process(
982            target=self._test_get,
983            args=(queue, child_can_start, parent_can_continue)
984            )
985        proc.daemon = True
986        proc.start()
987
988        self.assertEqual(queue_empty(queue), True)
989
990        child_can_start.set()
991        parent_can_continue.wait()
992
993        time.sleep(DELTA)
994        self.assertEqual(queue_empty(queue), False)
995
996        # Hangs unexpectedly, remove for now
997        #self.assertEqual(queue.get(), 1)
998        self.assertEqual(queue.get(True, None), 2)
999        self.assertEqual(queue.get(True), 3)
1000        self.assertEqual(queue.get(timeout=1), 4)
1001        self.assertEqual(queue.get_nowait(), 5)
1002
1003        self.assertEqual(queue_empty(queue), True)
1004
1005        get = TimingWrapper(queue.get)
1006        get_nowait = TimingWrapper(queue.get_nowait)
1007
1008        self.assertRaises(pyqueue.Empty, get, False)
1009        self.assertTimingAlmostEqual(get.elapsed, 0)
1010
1011        self.assertRaises(pyqueue.Empty, get, False, None)
1012        self.assertTimingAlmostEqual(get.elapsed, 0)
1013
1014        self.assertRaises(pyqueue.Empty, get_nowait)
1015        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
1016
1017        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
1018        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1019
1020        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
1021        self.assertTimingAlmostEqual(get.elapsed, 0)
1022
1023        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1024        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1025
1026        proc.join()
1027        close_queue(queue)
1028
1029    @classmethod
1030    def _test_fork(cls, queue):
1031        for i in range(10, 20):
1032            queue.put(i)
1033        # note that at this point the items may only be buffered, so the
1034        # process cannot shutdown until the feeder thread has finished
1035        # pushing items onto the pipe.
1036
1037    def test_fork(self):
1038        # Old versions of Queue would fail to create a new feeder
1039        # thread for a forked process if the original process had its
1040        # own feeder thread.  This test checks that this no longer
1041        # happens.
1042
1043        queue = self.Queue()
1044
1045        # put items on queue so that main process starts a feeder thread
1046        for i in range(10):
1047            queue.put(i)
1048
1049        # wait to make sure thread starts before we fork a new process
1050        time.sleep(DELTA)
1051
1052        # fork process
1053        p = self.Process(target=self._test_fork, args=(queue,))
1054        p.daemon = True
1055        p.start()
1056
1057        # check that all expected items are in the queue
1058        for i in range(20):
1059            self.assertEqual(queue.get(), i)
1060        self.assertRaises(pyqueue.Empty, queue.get, False)
1061
1062        p.join()
1063        close_queue(queue)
1064
1065    def test_qsize(self):
1066        q = self.Queue()
1067        try:
1068            self.assertEqual(q.qsize(), 0)
1069        except NotImplementedError:
1070            self.skipTest('qsize method not implemented')
1071        q.put(1)
1072        self.assertEqual(q.qsize(), 1)
1073        q.put(5)
1074        self.assertEqual(q.qsize(), 2)
1075        q.get()
1076        self.assertEqual(q.qsize(), 1)
1077        q.get()
1078        self.assertEqual(q.qsize(), 0)
1079        close_queue(q)
1080
1081    @classmethod
1082    def _test_task_done(cls, q):
1083        for obj in iter(q.get, None):
1084            time.sleep(DELTA)
1085            q.task_done()
1086
1087    def test_task_done(self):
1088        queue = self.JoinableQueue()
1089
1090        workers = [self.Process(target=self._test_task_done, args=(queue,))
1091                   for i in range(4)]
1092
1093        for p in workers:
1094            p.daemon = True
1095            p.start()
1096
1097        for i in range(10):
1098            queue.put(i)
1099
1100        queue.join()
1101
1102        for p in workers:
1103            queue.put(None)
1104
1105        for p in workers:
1106            p.join()
1107        close_queue(queue)
1108
1109    def test_no_import_lock_contention(self):
1110        with test.support.temp_cwd():
1111            module_name = 'imported_by_an_imported_module'
1112            with open(module_name + '.py', 'w') as f:
1113                f.write("""if 1:
1114                    import multiprocessing
1115
1116                    q = multiprocessing.Queue()
1117                    q.put('knock knock')
1118                    q.get(timeout=3)
1119                    q.close()
1120                    del q
1121                """)
1122
1123            with test.support.DirsOnSysPath(os.getcwd()):
1124                try:
1125                    __import__(module_name)
1126                except pyqueue.Empty:
1127                    self.fail("Probable regression on import lock contention;"
1128                              " see Issue #22853")
1129
1130    def test_timeout(self):
1131        q = multiprocessing.Queue()
1132        start = time.monotonic()
1133        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1134        delta = time.monotonic() - start
1135        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1136        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1137        # failed because the delta was only 135.8 ms.
1138        self.assertGreaterEqual(delta, 0.100)
1139        close_queue(q)
1140
1141    def test_queue_feeder_donot_stop_onexc(self):
1142        # bpo-30414: verify feeder handles exceptions correctly
1143        if self.TYPE != 'processes':
1144            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1145
1146        class NotSerializable(object):
1147            def __reduce__(self):
1148                raise AttributeError
1149        with test.support.captured_stderr():
1150            q = self.Queue()
1151            q.put(NotSerializable())
1152            q.put(True)
1153            self.assertTrue(q.get(timeout=TIMEOUT))
1154            close_queue(q)
1155
1156        with test.support.captured_stderr():
1157            # bpo-33078: verify that the queue size is correctly handled
1158            # on errors.
1159            q = self.Queue(maxsize=1)
1160            q.put(NotSerializable())
1161            q.put(True)
1162            try:
1163                self.assertEqual(q.qsize(), 1)
1164            except NotImplementedError:
1165                # qsize is not available on all platform as it
1166                # relies on sem_getvalue
1167                pass
1168            # bpo-30595: use a timeout of 1 second for slow buildbots
1169            self.assertTrue(q.get(timeout=1.0))
1170            # Check that the size of the queue is correct
1171            self.assertTrue(q.empty())
1172            close_queue(q)
1173
1174    def test_queue_feeder_on_queue_feeder_error(self):
1175        # bpo-30006: verify feeder handles exceptions using the
1176        # _on_queue_feeder_error hook.
1177        if self.TYPE != 'processes':
1178            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1179
1180        class NotSerializable(object):
1181            """Mock unserializable object"""
1182            def __init__(self):
1183                self.reduce_was_called = False
1184                self.on_queue_feeder_error_was_called = False
1185
1186            def __reduce__(self):
1187                self.reduce_was_called = True
1188                raise AttributeError
1189
1190        class SafeQueue(multiprocessing.queues.Queue):
1191            """Queue with overloaded _on_queue_feeder_error hook"""
1192            @staticmethod
1193            def _on_queue_feeder_error(e, obj):
1194                if (isinstance(e, AttributeError) and
1195                        isinstance(obj, NotSerializable)):
1196                    obj.on_queue_feeder_error_was_called = True
1197
1198        not_serializable_obj = NotSerializable()
1199        # The captured_stderr reduces the noise in the test report
1200        with test.support.captured_stderr():
1201            q = SafeQueue(ctx=multiprocessing.get_context())
1202            q.put(not_serializable_obj)
1203
1204            # Verify that q is still functioning correctly
1205            q.put(True)
1206            self.assertTrue(q.get(timeout=1.0))
1207
1208        # Assert that the serialization and the hook have been called correctly
1209        self.assertTrue(not_serializable_obj.reduce_was_called)
1210        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1211
1212    def test_closed_queue_put_get_exceptions(self):
1213        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1214            q.close()
1215            with self.assertRaisesRegex(ValueError, 'is closed'):
1216                q.put('foo')
1217            with self.assertRaisesRegex(ValueError, 'is closed'):
1218                q.get()
1219#
1220#
1221#
1222
1223class _TestLock(BaseTestCase):
1224
1225    def test_lock(self):
1226        lock = self.Lock()
1227        self.assertEqual(lock.acquire(), True)
1228        self.assertEqual(lock.acquire(False), False)
1229        self.assertEqual(lock.release(), None)
1230        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1231
1232    def test_rlock(self):
1233        lock = self.RLock()
1234        self.assertEqual(lock.acquire(), True)
1235        self.assertEqual(lock.acquire(), True)
1236        self.assertEqual(lock.acquire(), True)
1237        self.assertEqual(lock.release(), None)
1238        self.assertEqual(lock.release(), None)
1239        self.assertEqual(lock.release(), None)
1240        self.assertRaises((AssertionError, RuntimeError), lock.release)
1241
1242    def test_lock_context(self):
1243        with self.Lock():
1244            pass
1245
1246
1247class _TestSemaphore(BaseTestCase):
1248
1249    def _test_semaphore(self, sem):
1250        self.assertReturnsIfImplemented(2, get_value, sem)
1251        self.assertEqual(sem.acquire(), True)
1252        self.assertReturnsIfImplemented(1, get_value, sem)
1253        self.assertEqual(sem.acquire(), True)
1254        self.assertReturnsIfImplemented(0, get_value, sem)
1255        self.assertEqual(sem.acquire(False), False)
1256        self.assertReturnsIfImplemented(0, get_value, sem)
1257        self.assertEqual(sem.release(), None)
1258        self.assertReturnsIfImplemented(1, get_value, sem)
1259        self.assertEqual(sem.release(), None)
1260        self.assertReturnsIfImplemented(2, get_value, sem)
1261
1262    def test_semaphore(self):
1263        sem = self.Semaphore(2)
1264        self._test_semaphore(sem)
1265        self.assertEqual(sem.release(), None)
1266        self.assertReturnsIfImplemented(3, get_value, sem)
1267        self.assertEqual(sem.release(), None)
1268        self.assertReturnsIfImplemented(4, get_value, sem)
1269
1270    def test_bounded_semaphore(self):
1271        sem = self.BoundedSemaphore(2)
1272        self._test_semaphore(sem)
1273        # Currently fails on OS/X
1274        #if HAVE_GETVALUE:
1275        #    self.assertRaises(ValueError, sem.release)
1276        #    self.assertReturnsIfImplemented(2, get_value, sem)
1277
1278    def test_timeout(self):
1279        if self.TYPE != 'processes':
1280            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1281
1282        sem = self.Semaphore(0)
1283        acquire = TimingWrapper(sem.acquire)
1284
1285        self.assertEqual(acquire(False), False)
1286        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1287
1288        self.assertEqual(acquire(False, None), False)
1289        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1290
1291        self.assertEqual(acquire(False, TIMEOUT1), False)
1292        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1293
1294        self.assertEqual(acquire(True, TIMEOUT2), False)
1295        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1296
1297        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1298        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1299
1300
1301class _TestCondition(BaseTestCase):
1302
1303    @classmethod
1304    def f(cls, cond, sleeping, woken, timeout=None):
1305        cond.acquire()
1306        sleeping.release()
1307        cond.wait(timeout)
1308        woken.release()
1309        cond.release()
1310
1311    def assertReachesEventually(self, func, value):
1312        for i in range(10):
1313            try:
1314                if func() == value:
1315                    break
1316            except NotImplementedError:
1317                break
1318            time.sleep(DELTA)
1319        time.sleep(DELTA)
1320        self.assertReturnsIfImplemented(value, func)
1321
1322    def check_invariant(self, cond):
1323        # this is only supposed to succeed when there are no sleepers
1324        if self.TYPE == 'processes':
1325            try:
1326                sleepers = (cond._sleeping_count.get_value() -
1327                            cond._woken_count.get_value())
1328                self.assertEqual(sleepers, 0)
1329                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1330            except NotImplementedError:
1331                pass
1332
1333    def test_notify(self):
1334        cond = self.Condition()
1335        sleeping = self.Semaphore(0)
1336        woken = self.Semaphore(0)
1337
1338        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1339        p.daemon = True
1340        p.start()
1341        self.addCleanup(p.join)
1342
1343        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1344        p.daemon = True
1345        p.start()
1346        self.addCleanup(p.join)
1347
1348        # wait for both children to start sleeping
1349        sleeping.acquire()
1350        sleeping.acquire()
1351
1352        # check no process/thread has woken up
1353        time.sleep(DELTA)
1354        self.assertReturnsIfImplemented(0, get_value, woken)
1355
1356        # wake up one process/thread
1357        cond.acquire()
1358        cond.notify()
1359        cond.release()
1360
1361        # check one process/thread has woken up
1362        time.sleep(DELTA)
1363        self.assertReturnsIfImplemented(1, get_value, woken)
1364
1365        # wake up another
1366        cond.acquire()
1367        cond.notify()
1368        cond.release()
1369
1370        # check other has woken up
1371        time.sleep(DELTA)
1372        self.assertReturnsIfImplemented(2, get_value, woken)
1373
1374        # check state is not mucked up
1375        self.check_invariant(cond)
1376        p.join()
1377
1378    def test_notify_all(self):
1379        cond = self.Condition()
1380        sleeping = self.Semaphore(0)
1381        woken = self.Semaphore(0)
1382
1383        # start some threads/processes which will timeout
1384        for i in range(3):
1385            p = self.Process(target=self.f,
1386                             args=(cond, sleeping, woken, TIMEOUT1))
1387            p.daemon = True
1388            p.start()
1389            self.addCleanup(p.join)
1390
1391            t = threading.Thread(target=self.f,
1392                                 args=(cond, sleeping, woken, TIMEOUT1))
1393            t.daemon = True
1394            t.start()
1395            self.addCleanup(t.join)
1396
1397        # wait for them all to sleep
1398        for i in range(6):
1399            sleeping.acquire()
1400
1401        # check they have all timed out
1402        for i in range(6):
1403            woken.acquire()
1404        self.assertReturnsIfImplemented(0, get_value, woken)
1405
1406        # check state is not mucked up
1407        self.check_invariant(cond)
1408
1409        # start some more threads/processes
1410        for i in range(3):
1411            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1412            p.daemon = True
1413            p.start()
1414            self.addCleanup(p.join)
1415
1416            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1417            t.daemon = True
1418            t.start()
1419            self.addCleanup(t.join)
1420
1421        # wait for them to all sleep
1422        for i in range(6):
1423            sleeping.acquire()
1424
1425        # check no process/thread has woken up
1426        time.sleep(DELTA)
1427        self.assertReturnsIfImplemented(0, get_value, woken)
1428
1429        # wake them all up
1430        cond.acquire()
1431        cond.notify_all()
1432        cond.release()
1433
1434        # check they have all woken
1435        self.assertReachesEventually(lambda: get_value(woken), 6)
1436
1437        # check state is not mucked up
1438        self.check_invariant(cond)
1439
1440    def test_notify_n(self):
1441        cond = self.Condition()
1442        sleeping = self.Semaphore(0)
1443        woken = self.Semaphore(0)
1444
1445        # start some threads/processes
1446        for i in range(3):
1447            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1448            p.daemon = True
1449            p.start()
1450            self.addCleanup(p.join)
1451
1452            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1453            t.daemon = True
1454            t.start()
1455            self.addCleanup(t.join)
1456
1457        # wait for them to all sleep
1458        for i in range(6):
1459            sleeping.acquire()
1460
1461        # check no process/thread has woken up
1462        time.sleep(DELTA)
1463        self.assertReturnsIfImplemented(0, get_value, woken)
1464
1465        # wake some of them up
1466        cond.acquire()
1467        cond.notify(n=2)
1468        cond.release()
1469
1470        # check 2 have woken
1471        self.assertReachesEventually(lambda: get_value(woken), 2)
1472
1473        # wake the rest of them
1474        cond.acquire()
1475        cond.notify(n=4)
1476        cond.release()
1477
1478        self.assertReachesEventually(lambda: get_value(woken), 6)
1479
1480        # doesn't do anything more
1481        cond.acquire()
1482        cond.notify(n=3)
1483        cond.release()
1484
1485        self.assertReturnsIfImplemented(6, get_value, woken)
1486
1487        # check state is not mucked up
1488        self.check_invariant(cond)
1489
1490    def test_timeout(self):
1491        cond = self.Condition()
1492        wait = TimingWrapper(cond.wait)
1493        cond.acquire()
1494        res = wait(TIMEOUT1)
1495        cond.release()
1496        self.assertEqual(res, False)
1497        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1498
1499    @classmethod
1500    def _test_waitfor_f(cls, cond, state):
1501        with cond:
1502            state.value = 0
1503            cond.notify()
1504            result = cond.wait_for(lambda : state.value==4)
1505            if not result or state.value != 4:
1506                sys.exit(1)
1507
1508    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1509    def test_waitfor(self):
1510        # based on test in test/lock_tests.py
1511        cond = self.Condition()
1512        state = self.Value('i', -1)
1513
1514        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1515        p.daemon = True
1516        p.start()
1517
1518        with cond:
1519            result = cond.wait_for(lambda : state.value==0)
1520            self.assertTrue(result)
1521            self.assertEqual(state.value, 0)
1522
1523        for i in range(4):
1524            time.sleep(0.01)
1525            with cond:
1526                state.value += 1
1527                cond.notify()
1528
1529        join_process(p)
1530        self.assertEqual(p.exitcode, 0)
1531
1532    @classmethod
1533    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1534        sem.release()
1535        with cond:
1536            expected = 0.1
1537            dt = time.monotonic()
1538            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1539            dt = time.monotonic() - dt
1540            # borrow logic in assertTimeout() from test/lock_tests.py
1541            if not result and expected * 0.6 < dt < expected * 10.0:
1542                success.value = True
1543
1544    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1545    def test_waitfor_timeout(self):
1546        # based on test in test/lock_tests.py
1547        cond = self.Condition()
1548        state = self.Value('i', 0)
1549        success = self.Value('i', False)
1550        sem = self.Semaphore(0)
1551
1552        p = self.Process(target=self._test_waitfor_timeout_f,
1553                         args=(cond, state, success, sem))
1554        p.daemon = True
1555        p.start()
1556        self.assertTrue(sem.acquire(timeout=TIMEOUT))
1557
1558        # Only increment 3 times, so state == 4 is never reached.
1559        for i in range(3):
1560            time.sleep(0.01)
1561            with cond:
1562                state.value += 1
1563                cond.notify()
1564
1565        join_process(p)
1566        self.assertTrue(success.value)
1567
1568    @classmethod
1569    def _test_wait_result(cls, c, pid):
1570        with c:
1571            c.notify()
1572        time.sleep(1)
1573        if pid is not None:
1574            os.kill(pid, signal.SIGINT)
1575
1576    def test_wait_result(self):
1577        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1578            pid = os.getpid()
1579        else:
1580            pid = None
1581
1582        c = self.Condition()
1583        with c:
1584            self.assertFalse(c.wait(0))
1585            self.assertFalse(c.wait(0.1))
1586
1587            p = self.Process(target=self._test_wait_result, args=(c, pid))
1588            p.start()
1589
1590            self.assertTrue(c.wait(60))
1591            if pid is not None:
1592                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1593
1594            p.join()
1595
1596
1597class _TestEvent(BaseTestCase):
1598
1599    @classmethod
1600    def _test_event(cls, event):
1601        time.sleep(TIMEOUT2)
1602        event.set()
1603
1604    def test_event(self):
1605        event = self.Event()
1606        wait = TimingWrapper(event.wait)
1607
1608        # Removed temporarily, due to API shear, this does not
1609        # work with threading._Event objects. is_set == isSet
1610        self.assertEqual(event.is_set(), False)
1611
1612        # Removed, threading.Event.wait() will return the value of the __flag
1613        # instead of None. API Shear with the semaphore backed mp.Event
1614        self.assertEqual(wait(0.0), False)
1615        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1616        self.assertEqual(wait(TIMEOUT1), False)
1617        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1618
1619        event.set()
1620
1621        # See note above on the API differences
1622        self.assertEqual(event.is_set(), True)
1623        self.assertEqual(wait(), True)
1624        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1625        self.assertEqual(wait(TIMEOUT1), True)
1626        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1627        # self.assertEqual(event.is_set(), True)
1628
1629        event.clear()
1630
1631        #self.assertEqual(event.is_set(), False)
1632
1633        p = self.Process(target=self._test_event, args=(event,))
1634        p.daemon = True
1635        p.start()
1636        self.assertEqual(wait(), True)
1637        p.join()
1638
1639#
1640# Tests for Barrier - adapted from tests in test/lock_tests.py
1641#
1642
1643# Many of the tests for threading.Barrier use a list as an atomic
1644# counter: a value is appended to increment the counter, and the
1645# length of the list gives the value.  We use the class DummyList
1646# for the same purpose.
1647
1648class _DummyList(object):
1649
1650    def __init__(self):
1651        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1652        lock = multiprocessing.Lock()
1653        self.__setstate__((wrapper, lock))
1654        self._lengthbuf[0] = 0
1655
1656    def __setstate__(self, state):
1657        (self._wrapper, self._lock) = state
1658        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1659
1660    def __getstate__(self):
1661        return (self._wrapper, self._lock)
1662
1663    def append(self, _):
1664        with self._lock:
1665            self._lengthbuf[0] += 1
1666
1667    def __len__(self):
1668        with self._lock:
1669            return self._lengthbuf[0]
1670
1671def _wait():
1672    # A crude wait/yield function not relying on synchronization primitives.
1673    time.sleep(0.01)
1674
1675
1676class Bunch(object):
1677    """
1678    A bunch of threads.
1679    """
1680    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1681        """
1682        Construct a bunch of `n` threads running the same function `f`.
1683        If `wait_before_exit` is True, the threads won't terminate until
1684        do_finish() is called.
1685        """
1686        self.f = f
1687        self.args = args
1688        self.n = n
1689        self.started = namespace.DummyList()
1690        self.finished = namespace.DummyList()
1691        self._can_exit = namespace.Event()
1692        if not wait_before_exit:
1693            self._can_exit.set()
1694
1695        threads = []
1696        for i in range(n):
1697            p = namespace.Process(target=self.task)
1698            p.daemon = True
1699            p.start()
1700            threads.append(p)
1701
1702        def finalize(threads):
1703            for p in threads:
1704                p.join()
1705
1706        self._finalizer = weakref.finalize(self, finalize, threads)
1707
1708    def task(self):
1709        pid = os.getpid()
1710        self.started.append(pid)
1711        try:
1712            self.f(*self.args)
1713        finally:
1714            self.finished.append(pid)
1715            self._can_exit.wait(30)
1716            assert self._can_exit.is_set()
1717
1718    def wait_for_started(self):
1719        while len(self.started) < self.n:
1720            _wait()
1721
1722    def wait_for_finished(self):
1723        while len(self.finished) < self.n:
1724            _wait()
1725
1726    def do_finish(self):
1727        self._can_exit.set()
1728
1729    def close(self):
1730        self._finalizer()
1731
1732
1733class AppendTrue(object):
1734    def __init__(self, obj):
1735        self.obj = obj
1736    def __call__(self):
1737        self.obj.append(True)
1738
1739
1740class _TestBarrier(BaseTestCase):
1741    """
1742    Tests for Barrier objects.
1743    """
1744    N = 5
1745    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1746
1747    def setUp(self):
1748        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1749
1750    def tearDown(self):
1751        self.barrier.abort()
1752        self.barrier = None
1753
1754    def DummyList(self):
1755        if self.TYPE == 'threads':
1756            return []
1757        elif self.TYPE == 'manager':
1758            return self.manager.list()
1759        else:
1760            return _DummyList()
1761
1762    def run_threads(self, f, args):
1763        b = Bunch(self, f, args, self.N-1)
1764        try:
1765            f(*args)
1766            b.wait_for_finished()
1767        finally:
1768            b.close()
1769
1770    @classmethod
1771    def multipass(cls, barrier, results, n):
1772        m = barrier.parties
1773        assert m == cls.N
1774        for i in range(n):
1775            results[0].append(True)
1776            assert len(results[1]) == i * m
1777            barrier.wait()
1778            results[1].append(True)
1779            assert len(results[0]) == (i + 1) * m
1780            barrier.wait()
1781        try:
1782            assert barrier.n_waiting == 0
1783        except NotImplementedError:
1784            pass
1785        assert not barrier.broken
1786
1787    def test_barrier(self, passes=1):
1788        """
1789        Test that a barrier is passed in lockstep
1790        """
1791        results = [self.DummyList(), self.DummyList()]
1792        self.run_threads(self.multipass, (self.barrier, results, passes))
1793
1794    def test_barrier_10(self):
1795        """
1796        Test that a barrier works for 10 consecutive runs
1797        """
1798        return self.test_barrier(10)
1799
1800    @classmethod
1801    def _test_wait_return_f(cls, barrier, queue):
1802        res = barrier.wait()
1803        queue.put(res)
1804
1805    def test_wait_return(self):
1806        """
1807        test the return value from barrier.wait
1808        """
1809        queue = self.Queue()
1810        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1811        results = [queue.get() for i in range(self.N)]
1812        self.assertEqual(results.count(0), 1)
1813        close_queue(queue)
1814
1815    @classmethod
1816    def _test_action_f(cls, barrier, results):
1817        barrier.wait()
1818        if len(results) != 1:
1819            raise RuntimeError
1820
1821    def test_action(self):
1822        """
1823        Test the 'action' callback
1824        """
1825        results = self.DummyList()
1826        barrier = self.Barrier(self.N, action=AppendTrue(results))
1827        self.run_threads(self._test_action_f, (barrier, results))
1828        self.assertEqual(len(results), 1)
1829
1830    @classmethod
1831    def _test_abort_f(cls, barrier, results1, results2):
1832        try:
1833            i = barrier.wait()
1834            if i == cls.N//2:
1835                raise RuntimeError
1836            barrier.wait()
1837            results1.append(True)
1838        except threading.BrokenBarrierError:
1839            results2.append(True)
1840        except RuntimeError:
1841            barrier.abort()
1842
1843    def test_abort(self):
1844        """
1845        Test that an abort will put the barrier in a broken state
1846        """
1847        results1 = self.DummyList()
1848        results2 = self.DummyList()
1849        self.run_threads(self._test_abort_f,
1850                         (self.barrier, results1, results2))
1851        self.assertEqual(len(results1), 0)
1852        self.assertEqual(len(results2), self.N-1)
1853        self.assertTrue(self.barrier.broken)
1854
1855    @classmethod
1856    def _test_reset_f(cls, barrier, results1, results2, results3):
1857        i = barrier.wait()
1858        if i == cls.N//2:
1859            # Wait until the other threads are all in the barrier.
1860            while barrier.n_waiting < cls.N-1:
1861                time.sleep(0.001)
1862            barrier.reset()
1863        else:
1864            try:
1865                barrier.wait()
1866                results1.append(True)
1867            except threading.BrokenBarrierError:
1868                results2.append(True)
1869        # Now, pass the barrier again
1870        barrier.wait()
1871        results3.append(True)
1872
1873    def test_reset(self):
1874        """
1875        Test that a 'reset' on a barrier frees the waiting threads
1876        """
1877        results1 = self.DummyList()
1878        results2 = self.DummyList()
1879        results3 = self.DummyList()
1880        self.run_threads(self._test_reset_f,
1881                         (self.barrier, results1, results2, results3))
1882        self.assertEqual(len(results1), 0)
1883        self.assertEqual(len(results2), self.N-1)
1884        self.assertEqual(len(results3), self.N)
1885
1886    @classmethod
1887    def _test_abort_and_reset_f(cls, barrier, barrier2,
1888                                results1, results2, results3):
1889        try:
1890            i = barrier.wait()
1891            if i == cls.N//2:
1892                raise RuntimeError
1893            barrier.wait()
1894            results1.append(True)
1895        except threading.BrokenBarrierError:
1896            results2.append(True)
1897        except RuntimeError:
1898            barrier.abort()
1899        # Synchronize and reset the barrier.  Must synchronize first so
1900        # that everyone has left it when we reset, and after so that no
1901        # one enters it before the reset.
1902        if barrier2.wait() == cls.N//2:
1903            barrier.reset()
1904        barrier2.wait()
1905        barrier.wait()
1906        results3.append(True)
1907
1908    def test_abort_and_reset(self):
1909        """
1910        Test that a barrier can be reset after being broken.
1911        """
1912        results1 = self.DummyList()
1913        results2 = self.DummyList()
1914        results3 = self.DummyList()
1915        barrier2 = self.Barrier(self.N)
1916
1917        self.run_threads(self._test_abort_and_reset_f,
1918                         (self.barrier, barrier2, results1, results2, results3))
1919        self.assertEqual(len(results1), 0)
1920        self.assertEqual(len(results2), self.N-1)
1921        self.assertEqual(len(results3), self.N)
1922
1923    @classmethod
1924    def _test_timeout_f(cls, barrier, results):
1925        i = barrier.wait()
1926        if i == cls.N//2:
1927            # One thread is late!
1928            time.sleep(1.0)
1929        try:
1930            barrier.wait(0.5)
1931        except threading.BrokenBarrierError:
1932            results.append(True)
1933
1934    def test_timeout(self):
1935        """
1936        Test wait(timeout)
1937        """
1938        results = self.DummyList()
1939        self.run_threads(self._test_timeout_f, (self.barrier, results))
1940        self.assertEqual(len(results), self.barrier.parties)
1941
1942    @classmethod
1943    def _test_default_timeout_f(cls, barrier, results):
1944        i = barrier.wait(cls.defaultTimeout)
1945        if i == cls.N//2:
1946            # One thread is later than the default timeout
1947            time.sleep(1.0)
1948        try:
1949            barrier.wait()
1950        except threading.BrokenBarrierError:
1951            results.append(True)
1952
1953    def test_default_timeout(self):
1954        """
1955        Test the barrier's default timeout
1956        """
1957        barrier = self.Barrier(self.N, timeout=0.5)
1958        results = self.DummyList()
1959        self.run_threads(self._test_default_timeout_f, (barrier, results))
1960        self.assertEqual(len(results), barrier.parties)
1961
1962    def test_single_thread(self):
1963        b = self.Barrier(1)
1964        b.wait()
1965        b.wait()
1966
1967    @classmethod
1968    def _test_thousand_f(cls, barrier, passes, conn, lock):
1969        for i in range(passes):
1970            barrier.wait()
1971            with lock:
1972                conn.send(i)
1973
1974    def test_thousand(self):
1975        if self.TYPE == 'manager':
1976            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1977        passes = 1000
1978        lock = self.Lock()
1979        conn, child_conn = self.Pipe(False)
1980        for j in range(self.N):
1981            p = self.Process(target=self._test_thousand_f,
1982                           args=(self.barrier, passes, child_conn, lock))
1983            p.start()
1984            self.addCleanup(p.join)
1985
1986        for i in range(passes):
1987            for j in range(self.N):
1988                self.assertEqual(conn.recv(), i)
1989
1990#
1991#
1992#
1993
1994class _TestValue(BaseTestCase):
1995
1996    ALLOWED_TYPES = ('processes',)
1997
1998    codes_values = [
1999        ('i', 4343, 24234),
2000        ('d', 3.625, -4.25),
2001        ('h', -232, 234),
2002        ('q', 2 ** 33, 2 ** 34),
2003        ('c', latin('x'), latin('y'))
2004        ]
2005
2006    def setUp(self):
2007        if not HAS_SHAREDCTYPES:
2008            self.skipTest("requires multiprocessing.sharedctypes")
2009
2010    @classmethod
2011    def _test(cls, values):
2012        for sv, cv in zip(values, cls.codes_values):
2013            sv.value = cv[2]
2014
2015
2016    def test_value(self, raw=False):
2017        if raw:
2018            values = [self.RawValue(code, value)
2019                      for code, value, _ in self.codes_values]
2020        else:
2021            values = [self.Value(code, value)
2022                      for code, value, _ in self.codes_values]
2023
2024        for sv, cv in zip(values, self.codes_values):
2025            self.assertEqual(sv.value, cv[1])
2026
2027        proc = self.Process(target=self._test, args=(values,))
2028        proc.daemon = True
2029        proc.start()
2030        proc.join()
2031
2032        for sv, cv in zip(values, self.codes_values):
2033            self.assertEqual(sv.value, cv[2])
2034
2035    def test_rawvalue(self):
2036        self.test_value(raw=True)
2037
2038    def test_getobj_getlock(self):
2039        val1 = self.Value('i', 5)
2040        lock1 = val1.get_lock()
2041        obj1 = val1.get_obj()
2042
2043        val2 = self.Value('i', 5, lock=None)
2044        lock2 = val2.get_lock()
2045        obj2 = val2.get_obj()
2046
2047        lock = self.Lock()
2048        val3 = self.Value('i', 5, lock=lock)
2049        lock3 = val3.get_lock()
2050        obj3 = val3.get_obj()
2051        self.assertEqual(lock, lock3)
2052
2053        arr4 = self.Value('i', 5, lock=False)
2054        self.assertFalse(hasattr(arr4, 'get_lock'))
2055        self.assertFalse(hasattr(arr4, 'get_obj'))
2056
2057        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2058
2059        arr5 = self.RawValue('i', 5)
2060        self.assertFalse(hasattr(arr5, 'get_lock'))
2061        self.assertFalse(hasattr(arr5, 'get_obj'))
2062
2063
2064class _TestArray(BaseTestCase):
2065
2066    ALLOWED_TYPES = ('processes',)
2067
2068    @classmethod
2069    def f(cls, seq):
2070        for i in range(1, len(seq)):
2071            seq[i] += seq[i-1]
2072
2073    @unittest.skipIf(c_int is None, "requires _ctypes")
2074    def test_array(self, raw=False):
2075        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2076        if raw:
2077            arr = self.RawArray('i', seq)
2078        else:
2079            arr = self.Array('i', seq)
2080
2081        self.assertEqual(len(arr), len(seq))
2082        self.assertEqual(arr[3], seq[3])
2083        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2084
2085        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2086
2087        self.assertEqual(list(arr[:]), seq)
2088
2089        self.f(seq)
2090
2091        p = self.Process(target=self.f, args=(arr,))
2092        p.daemon = True
2093        p.start()
2094        p.join()
2095
2096        self.assertEqual(list(arr[:]), seq)
2097
2098    @unittest.skipIf(c_int is None, "requires _ctypes")
2099    def test_array_from_size(self):
2100        size = 10
2101        # Test for zeroing (see issue #11675).
2102        # The repetition below strengthens the test by increasing the chances
2103        # of previously allocated non-zero memory being used for the new array
2104        # on the 2nd and 3rd loops.
2105        for _ in range(3):
2106            arr = self.Array('i', size)
2107            self.assertEqual(len(arr), size)
2108            self.assertEqual(list(arr), [0] * size)
2109            arr[:] = range(10)
2110            self.assertEqual(list(arr), list(range(10)))
2111            del arr
2112
2113    @unittest.skipIf(c_int is None, "requires _ctypes")
2114    def test_rawarray(self):
2115        self.test_array(raw=True)
2116
2117    @unittest.skipIf(c_int is None, "requires _ctypes")
2118    def test_getobj_getlock_obj(self):
2119        arr1 = self.Array('i', list(range(10)))
2120        lock1 = arr1.get_lock()
2121        obj1 = arr1.get_obj()
2122
2123        arr2 = self.Array('i', list(range(10)), lock=None)
2124        lock2 = arr2.get_lock()
2125        obj2 = arr2.get_obj()
2126
2127        lock = self.Lock()
2128        arr3 = self.Array('i', list(range(10)), lock=lock)
2129        lock3 = arr3.get_lock()
2130        obj3 = arr3.get_obj()
2131        self.assertEqual(lock, lock3)
2132
2133        arr4 = self.Array('i', range(10), lock=False)
2134        self.assertFalse(hasattr(arr4, 'get_lock'))
2135        self.assertFalse(hasattr(arr4, 'get_obj'))
2136        self.assertRaises(AttributeError,
2137                          self.Array, 'i', range(10), lock='notalock')
2138
2139        arr5 = self.RawArray('i', range(10))
2140        self.assertFalse(hasattr(arr5, 'get_lock'))
2141        self.assertFalse(hasattr(arr5, 'get_obj'))
2142
2143#
2144#
2145#
2146
2147class _TestContainers(BaseTestCase):
2148
2149    ALLOWED_TYPES = ('manager',)
2150
2151    def test_list(self):
2152        a = self.list(list(range(10)))
2153        self.assertEqual(a[:], list(range(10)))
2154
2155        b = self.list()
2156        self.assertEqual(b[:], [])
2157
2158        b.extend(list(range(5)))
2159        self.assertEqual(b[:], list(range(5)))
2160
2161        self.assertEqual(b[2], 2)
2162        self.assertEqual(b[2:10], [2,3,4])
2163
2164        b *= 2
2165        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2166
2167        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2168
2169        self.assertEqual(a[:], list(range(10)))
2170
2171        d = [a, b]
2172        e = self.list(d)
2173        self.assertEqual(
2174            [element[:] for element in e],
2175            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2176            )
2177
2178        f = self.list([a])
2179        a.append('hello')
2180        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2181
2182    def test_list_iter(self):
2183        a = self.list(list(range(10)))
2184        it = iter(a)
2185        self.assertEqual(list(it), list(range(10)))
2186        self.assertEqual(list(it), [])  # exhausted
2187        # list modified during iteration
2188        it = iter(a)
2189        a[0] = 100
2190        self.assertEqual(next(it), 100)
2191
2192    def test_list_proxy_in_list(self):
2193        a = self.list([self.list(range(3)) for _i in range(3)])
2194        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2195
2196        a[0][-1] = 55
2197        self.assertEqual(a[0][:], [0, 1, 55])
2198        for i in range(1, 3):
2199            self.assertEqual(a[i][:], [0, 1, 2])
2200
2201        self.assertEqual(a[1].pop(), 2)
2202        self.assertEqual(len(a[1]), 2)
2203        for i in range(0, 3, 2):
2204            self.assertEqual(len(a[i]), 3)
2205
2206        del a
2207
2208        b = self.list()
2209        b.append(b)
2210        del b
2211
2212    def test_dict(self):
2213        d = self.dict()
2214        indices = list(range(65, 70))
2215        for i in indices:
2216            d[i] = chr(i)
2217        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2218        self.assertEqual(sorted(d.keys()), indices)
2219        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2220        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2221
2222    def test_dict_iter(self):
2223        d = self.dict()
2224        indices = list(range(65, 70))
2225        for i in indices:
2226            d[i] = chr(i)
2227        it = iter(d)
2228        self.assertEqual(list(it), indices)
2229        self.assertEqual(list(it), [])  # exhausted
2230        # dictionary changed size during iteration
2231        it = iter(d)
2232        d.clear()
2233        self.assertRaises(RuntimeError, next, it)
2234
2235    def test_dict_proxy_nested(self):
2236        pets = self.dict(ferrets=2, hamsters=4)
2237        supplies = self.dict(water=10, feed=3)
2238        d = self.dict(pets=pets, supplies=supplies)
2239
2240        self.assertEqual(supplies['water'], 10)
2241        self.assertEqual(d['supplies']['water'], 10)
2242
2243        d['supplies']['blankets'] = 5
2244        self.assertEqual(supplies['blankets'], 5)
2245        self.assertEqual(d['supplies']['blankets'], 5)
2246
2247        d['supplies']['water'] = 7
2248        self.assertEqual(supplies['water'], 7)
2249        self.assertEqual(d['supplies']['water'], 7)
2250
2251        del pets
2252        del supplies
2253        self.assertEqual(d['pets']['ferrets'], 2)
2254        d['supplies']['blankets'] = 11
2255        self.assertEqual(d['supplies']['blankets'], 11)
2256
2257        pets = d['pets']
2258        supplies = d['supplies']
2259        supplies['water'] = 7
2260        self.assertEqual(supplies['water'], 7)
2261        self.assertEqual(d['supplies']['water'], 7)
2262
2263        d.clear()
2264        self.assertEqual(len(d), 0)
2265        self.assertEqual(supplies['water'], 7)
2266        self.assertEqual(pets['hamsters'], 4)
2267
2268        l = self.list([pets, supplies])
2269        l[0]['marmots'] = 1
2270        self.assertEqual(pets['marmots'], 1)
2271        self.assertEqual(l[0]['marmots'], 1)
2272
2273        del pets
2274        del supplies
2275        self.assertEqual(l[0]['marmots'], 1)
2276
2277        outer = self.list([[88, 99], l])
2278        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2279        self.assertEqual(outer[-1][-1]['feed'], 3)
2280
2281    def test_namespace(self):
2282        n = self.Namespace()
2283        n.name = 'Bob'
2284        n.job = 'Builder'
2285        n._hidden = 'hidden'
2286        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2287        del n.job
2288        self.assertEqual(str(n), "Namespace(name='Bob')")
2289        self.assertTrue(hasattr(n, 'name'))
2290        self.assertTrue(not hasattr(n, 'job'))
2291
2292#
2293#
2294#
2295
2296def sqr(x, wait=0.0):
2297    time.sleep(wait)
2298    return x*x
2299
2300def mul(x, y):
2301    return x*y
2302
2303def raise_large_valuerror(wait):
2304    time.sleep(wait)
2305    raise ValueError("x" * 1024**2)
2306
2307def identity(x):
2308    return x
2309
2310class CountedObject(object):
2311    n_instances = 0
2312
2313    def __new__(cls):
2314        cls.n_instances += 1
2315        return object.__new__(cls)
2316
2317    def __del__(self):
2318        type(self).n_instances -= 1
2319
2320class SayWhenError(ValueError): pass
2321
2322def exception_throwing_generator(total, when):
2323    if when == -1:
2324        raise SayWhenError("Somebody said when")
2325    for i in range(total):
2326        if i == when:
2327            raise SayWhenError("Somebody said when")
2328        yield i
2329
2330
2331class _TestPool(BaseTestCase):
2332
2333    @classmethod
2334    def setUpClass(cls):
2335        super().setUpClass()
2336        cls.pool = cls.Pool(4)
2337
2338    @classmethod
2339    def tearDownClass(cls):
2340        cls.pool.terminate()
2341        cls.pool.join()
2342        cls.pool = None
2343        super().tearDownClass()
2344
2345    def test_apply(self):
2346        papply = self.pool.apply
2347        self.assertEqual(papply(sqr, (5,)), sqr(5))
2348        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2349
2350    def test_map(self):
2351        pmap = self.pool.map
2352        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2353        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2354                         list(map(sqr, list(range(100)))))
2355
2356    def test_starmap(self):
2357        psmap = self.pool.starmap
2358        tuples = list(zip(range(10), range(9,-1, -1)))
2359        self.assertEqual(psmap(mul, tuples),
2360                         list(itertools.starmap(mul, tuples)))
2361        tuples = list(zip(range(100), range(99,-1, -1)))
2362        self.assertEqual(psmap(mul, tuples, chunksize=20),
2363                         list(itertools.starmap(mul, tuples)))
2364
2365    def test_starmap_async(self):
2366        tuples = list(zip(range(100), range(99,-1, -1)))
2367        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2368                         list(itertools.starmap(mul, tuples)))
2369
2370    def test_map_async(self):
2371        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2372                         list(map(sqr, list(range(10)))))
2373
2374    def test_map_async_callbacks(self):
2375        call_args = self.manager.list() if self.TYPE == 'manager' else []
2376        self.pool.map_async(int, ['1'],
2377                            callback=call_args.append,
2378                            error_callback=call_args.append).wait()
2379        self.assertEqual(1, len(call_args))
2380        self.assertEqual([1], call_args[0])
2381        self.pool.map_async(int, ['a'],
2382                            callback=call_args.append,
2383                            error_callback=call_args.append).wait()
2384        self.assertEqual(2, len(call_args))
2385        self.assertIsInstance(call_args[1], ValueError)
2386
2387    def test_map_unplicklable(self):
2388        # Issue #19425 -- failure to pickle should not cause a hang
2389        if self.TYPE == 'threads':
2390            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2391        class A(object):
2392            def __reduce__(self):
2393                raise RuntimeError('cannot pickle')
2394        with self.assertRaises(RuntimeError):
2395            self.pool.map(sqr, [A()]*10)
2396
2397    def test_map_chunksize(self):
2398        try:
2399            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2400        except multiprocessing.TimeoutError:
2401            self.fail("pool.map_async with chunksize stalled on null list")
2402
2403    def test_map_handle_iterable_exception(self):
2404        if self.TYPE == 'manager':
2405            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2406
2407        # SayWhenError seen at the very first of the iterable
2408        with self.assertRaises(SayWhenError):
2409            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2410        # again, make sure it's reentrant
2411        with self.assertRaises(SayWhenError):
2412            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2413
2414        with self.assertRaises(SayWhenError):
2415            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2416
2417        class SpecialIterable:
2418            def __iter__(self):
2419                return self
2420            def __next__(self):
2421                raise SayWhenError
2422            def __len__(self):
2423                return 1
2424        with self.assertRaises(SayWhenError):
2425            self.pool.map(sqr, SpecialIterable(), 1)
2426        with self.assertRaises(SayWhenError):
2427            self.pool.map(sqr, SpecialIterable(), 1)
2428
2429    def test_async(self):
2430        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2431        get = TimingWrapper(res.get)
2432        self.assertEqual(get(), 49)
2433        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2434
2435    def test_async_timeout(self):
2436        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
2437        get = TimingWrapper(res.get)
2438        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2439        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2440
2441    def test_imap(self):
2442        it = self.pool.imap(sqr, list(range(10)))
2443        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2444
2445        it = self.pool.imap(sqr, list(range(10)))
2446        for i in range(10):
2447            self.assertEqual(next(it), i*i)
2448        self.assertRaises(StopIteration, it.__next__)
2449
2450        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2451        for i in range(1000):
2452            self.assertEqual(next(it), i*i)
2453        self.assertRaises(StopIteration, it.__next__)
2454
2455    def test_imap_handle_iterable_exception(self):
2456        if self.TYPE == 'manager':
2457            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2458
2459        # SayWhenError seen at the very first of the iterable
2460        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2461        self.assertRaises(SayWhenError, it.__next__)
2462        # again, make sure it's reentrant
2463        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2464        self.assertRaises(SayWhenError, it.__next__)
2465
2466        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2467        for i in range(3):
2468            self.assertEqual(next(it), i*i)
2469        self.assertRaises(SayWhenError, it.__next__)
2470
2471        # SayWhenError seen at start of problematic chunk's results
2472        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2473        for i in range(6):
2474            self.assertEqual(next(it), i*i)
2475        self.assertRaises(SayWhenError, it.__next__)
2476        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2477        for i in range(4):
2478            self.assertEqual(next(it), i*i)
2479        self.assertRaises(SayWhenError, it.__next__)
2480
2481    def test_imap_unordered(self):
2482        it = self.pool.imap_unordered(sqr, list(range(10)))
2483        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2484
2485        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2486        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2487
2488    def test_imap_unordered_handle_iterable_exception(self):
2489        if self.TYPE == 'manager':
2490            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2491
2492        # SayWhenError seen at the very first of the iterable
2493        it = self.pool.imap_unordered(sqr,
2494                                      exception_throwing_generator(1, -1),
2495                                      1)
2496        self.assertRaises(SayWhenError, it.__next__)
2497        # again, make sure it's reentrant
2498        it = self.pool.imap_unordered(sqr,
2499                                      exception_throwing_generator(1, -1),
2500                                      1)
2501        self.assertRaises(SayWhenError, it.__next__)
2502
2503        it = self.pool.imap_unordered(sqr,
2504                                      exception_throwing_generator(10, 3),
2505                                      1)
2506        expected_values = list(map(sqr, list(range(10))))
2507        with self.assertRaises(SayWhenError):
2508            # imap_unordered makes it difficult to anticipate the SayWhenError
2509            for i in range(10):
2510                value = next(it)
2511                self.assertIn(value, expected_values)
2512                expected_values.remove(value)
2513
2514        it = self.pool.imap_unordered(sqr,
2515                                      exception_throwing_generator(20, 7),
2516                                      2)
2517        expected_values = list(map(sqr, list(range(20))))
2518        with self.assertRaises(SayWhenError):
2519            for i in range(20):
2520                value = next(it)
2521                self.assertIn(value, expected_values)
2522                expected_values.remove(value)
2523
2524    def test_make_pool(self):
2525        expected_error = (RemoteError if self.TYPE == 'manager'
2526                          else ValueError)
2527
2528        self.assertRaises(expected_error, self.Pool, -1)
2529        self.assertRaises(expected_error, self.Pool, 0)
2530
2531        if self.TYPE != 'manager':
2532            p = self.Pool(3)
2533            try:
2534                self.assertEqual(3, len(p._pool))
2535            finally:
2536                p.close()
2537                p.join()
2538
2539    def test_terminate(self):
2540        result = self.pool.map_async(
2541            time.sleep, [0.1 for i in range(10000)], chunksize=1
2542            )
2543        self.pool.terminate()
2544        join = TimingWrapper(self.pool.join)
2545        join()
2546        # Sanity check the pool didn't wait for all tasks to finish
2547        self.assertLess(join.elapsed, 2.0)
2548
2549    def test_empty_iterable(self):
2550        # See Issue 12157
2551        p = self.Pool(1)
2552
2553        self.assertEqual(p.map(sqr, []), [])
2554        self.assertEqual(list(p.imap(sqr, [])), [])
2555        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2556        self.assertEqual(p.map_async(sqr, []).get(), [])
2557
2558        p.close()
2559        p.join()
2560
2561    def test_context(self):
2562        if self.TYPE == 'processes':
2563            L = list(range(10))
2564            expected = [sqr(i) for i in L]
2565            with self.Pool(2) as p:
2566                r = p.map_async(sqr, L)
2567                self.assertEqual(r.get(), expected)
2568            p.join()
2569            self.assertRaises(ValueError, p.map_async, sqr, L)
2570
2571    @classmethod
2572    def _test_traceback(cls):
2573        raise RuntimeError(123) # some comment
2574
2575    def test_traceback(self):
2576        # We want ensure that the traceback from the child process is
2577        # contained in the traceback raised in the main process.
2578        if self.TYPE == 'processes':
2579            with self.Pool(1) as p:
2580                try:
2581                    p.apply(self._test_traceback)
2582                except Exception as e:
2583                    exc = e
2584                else:
2585                    self.fail('expected RuntimeError')
2586            p.join()
2587            self.assertIs(type(exc), RuntimeError)
2588            self.assertEqual(exc.args, (123,))
2589            cause = exc.__cause__
2590            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2591            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2592
2593            with test.support.captured_stderr() as f1:
2594                try:
2595                    raise exc
2596                except RuntimeError:
2597                    sys.excepthook(*sys.exc_info())
2598            self.assertIn('raise RuntimeError(123) # some comment',
2599                          f1.getvalue())
2600            # _helper_reraises_exception should not make the error
2601            # a remote exception
2602            with self.Pool(1) as p:
2603                try:
2604                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2605                except Exception as e:
2606                    exc = e
2607                else:
2608                    self.fail('expected SayWhenError')
2609                self.assertIs(type(exc), SayWhenError)
2610                self.assertIs(exc.__cause__, None)
2611            p.join()
2612
2613    @classmethod
2614    def _test_wrapped_exception(cls):
2615        raise RuntimeError('foo')
2616
2617    def test_wrapped_exception(self):
2618        # Issue #20980: Should not wrap exception when using thread pool
2619        with self.Pool(1) as p:
2620            with self.assertRaises(RuntimeError):
2621                p.apply(self._test_wrapped_exception)
2622        p.join()
2623
2624    def test_map_no_failfast(self):
2625        # Issue #23992: the fail-fast behaviour when an exception is raised
2626        # during map() would make Pool.join() deadlock, because a worker
2627        # process would fill the result queue (after the result handler thread
2628        # terminated, hence not draining it anymore).
2629
2630        t_start = time.monotonic()
2631
2632        with self.assertRaises(ValueError):
2633            with self.Pool(2) as p:
2634                try:
2635                    p.map(raise_large_valuerror, [0, 1])
2636                finally:
2637                    time.sleep(0.5)
2638                    p.close()
2639                    p.join()
2640
2641        # check that we indeed waited for all jobs
2642        self.assertGreater(time.monotonic() - t_start, 0.9)
2643
2644    def test_release_task_refs(self):
2645        # Issue #29861: task arguments and results should not be kept
2646        # alive after we are done with them.
2647        objs = [CountedObject() for i in range(10)]
2648        refs = [weakref.ref(o) for o in objs]
2649        self.pool.map(identity, objs)
2650
2651        del objs
2652        time.sleep(DELTA)  # let threaded cleanup code run
2653        self.assertEqual(set(wr() for wr in refs), {None})
2654        # With a process pool, copies of the objects are returned, check
2655        # they were released too.
2656        self.assertEqual(CountedObject.n_instances, 0)
2657
2658    def test_enter(self):
2659        if self.TYPE == 'manager':
2660            self.skipTest("test not applicable to manager")
2661
2662        pool = self.Pool(1)
2663        with pool:
2664            pass
2665            # call pool.terminate()
2666        # pool is no longer running
2667
2668        with self.assertRaises(ValueError):
2669            # bpo-35477: pool.__enter__() fails if the pool is not running
2670            with pool:
2671                pass
2672        pool.join()
2673
2674    def test_resource_warning(self):
2675        if self.TYPE == 'manager':
2676            self.skipTest("test not applicable to manager")
2677
2678        pool = self.Pool(1)
2679        pool.terminate()
2680        pool.join()
2681
2682        # force state to RUN to emit ResourceWarning in __del__()
2683        pool._state = multiprocessing.pool.RUN
2684
2685        with support.check_warnings(('unclosed running multiprocessing pool',
2686                                     ResourceWarning)):
2687            pool = None
2688            support.gc_collect()
2689
2690def raising():
2691    raise KeyError("key")
2692
2693def unpickleable_result():
2694    return lambda: 42
2695
2696class _TestPoolWorkerErrors(BaseTestCase):
2697    ALLOWED_TYPES = ('processes', )
2698
2699    def test_async_error_callback(self):
2700        p = multiprocessing.Pool(2)
2701
2702        scratchpad = [None]
2703        def errback(exc):
2704            scratchpad[0] = exc
2705
2706        res = p.apply_async(raising, error_callback=errback)
2707        self.assertRaises(KeyError, res.get)
2708        self.assertTrue(scratchpad[0])
2709        self.assertIsInstance(scratchpad[0], KeyError)
2710
2711        p.close()
2712        p.join()
2713
2714    def test_unpickleable_result(self):
2715        from multiprocessing.pool import MaybeEncodingError
2716        p = multiprocessing.Pool(2)
2717
2718        # Make sure we don't lose pool processes because of encoding errors.
2719        for iteration in range(20):
2720
2721            scratchpad = [None]
2722            def errback(exc):
2723                scratchpad[0] = exc
2724
2725            res = p.apply_async(unpickleable_result, error_callback=errback)
2726            self.assertRaises(MaybeEncodingError, res.get)
2727            wrapped = scratchpad[0]
2728            self.assertTrue(wrapped)
2729            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2730            self.assertIsNotNone(wrapped.exc)
2731            self.assertIsNotNone(wrapped.value)
2732
2733        p.close()
2734        p.join()
2735
2736class _TestPoolWorkerLifetime(BaseTestCase):
2737    ALLOWED_TYPES = ('processes', )
2738
2739    def test_pool_worker_lifetime(self):
2740        p = multiprocessing.Pool(3, maxtasksperchild=10)
2741        self.assertEqual(3, len(p._pool))
2742        origworkerpids = [w.pid for w in p._pool]
2743        # Run many tasks so each worker gets replaced (hopefully)
2744        results = []
2745        for i in range(100):
2746            results.append(p.apply_async(sqr, (i, )))
2747        # Fetch the results and verify we got the right answers,
2748        # also ensuring all the tasks have completed.
2749        for (j, res) in enumerate(results):
2750            self.assertEqual(res.get(), sqr(j))
2751        # Refill the pool
2752        p._repopulate_pool()
2753        # Wait until all workers are alive
2754        # (countdown * DELTA = 5 seconds max startup process time)
2755        countdown = 50
2756        while countdown and not all(w.is_alive() for w in p._pool):
2757            countdown -= 1
2758            time.sleep(DELTA)
2759        finalworkerpids = [w.pid for w in p._pool]
2760        # All pids should be assigned.  See issue #7805.
2761        self.assertNotIn(None, origworkerpids)
2762        self.assertNotIn(None, finalworkerpids)
2763        # Finally, check that the worker pids have changed
2764        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2765        p.close()
2766        p.join()
2767
2768    def test_pool_worker_lifetime_early_close(self):
2769        # Issue #10332: closing a pool whose workers have limited lifetimes
2770        # before all the tasks completed would make join() hang.
2771        p = multiprocessing.Pool(3, maxtasksperchild=1)
2772        results = []
2773        for i in range(6):
2774            results.append(p.apply_async(sqr, (i, 0.3)))
2775        p.close()
2776        p.join()
2777        # check the results
2778        for (j, res) in enumerate(results):
2779            self.assertEqual(res.get(), sqr(j))
2780
2781#
2782# Test of creating a customized manager class
2783#
2784
2785from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2786
2787class FooBar(object):
2788    def f(self):
2789        return 'f()'
2790    def g(self):
2791        raise ValueError
2792    def _h(self):
2793        return '_h()'
2794
2795def baz():
2796    for i in range(10):
2797        yield i*i
2798
2799class IteratorProxy(BaseProxy):
2800    _exposed_ = ('__next__',)
2801    def __iter__(self):
2802        return self
2803    def __next__(self):
2804        return self._callmethod('__next__')
2805
2806class MyManager(BaseManager):
2807    pass
2808
2809MyManager.register('Foo', callable=FooBar)
2810MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2811MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2812
2813
2814class _TestMyManager(BaseTestCase):
2815
2816    ALLOWED_TYPES = ('manager',)
2817
2818    def test_mymanager(self):
2819        manager = MyManager()
2820        manager.start()
2821        self.common(manager)
2822        manager.shutdown()
2823
2824        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2825        # to the manager process if it takes longer than 1 second to stop,
2826        # which happens on slow buildbots.
2827        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2828
2829    def test_mymanager_context(self):
2830        with MyManager() as manager:
2831            self.common(manager)
2832        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2833        # to the manager process if it takes longer than 1 second to stop,
2834        # which happens on slow buildbots.
2835        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2836
2837    def test_mymanager_context_prestarted(self):
2838        manager = MyManager()
2839        manager.start()
2840        with manager:
2841            self.common(manager)
2842        self.assertEqual(manager._process.exitcode, 0)
2843
2844    def common(self, manager):
2845        foo = manager.Foo()
2846        bar = manager.Bar()
2847        baz = manager.baz()
2848
2849        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2850        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2851
2852        self.assertEqual(foo_methods, ['f', 'g'])
2853        self.assertEqual(bar_methods, ['f', '_h'])
2854
2855        self.assertEqual(foo.f(), 'f()')
2856        self.assertRaises(ValueError, foo.g)
2857        self.assertEqual(foo._callmethod('f'), 'f()')
2858        self.assertRaises(RemoteError, foo._callmethod, '_h')
2859
2860        self.assertEqual(bar.f(), 'f()')
2861        self.assertEqual(bar._h(), '_h()')
2862        self.assertEqual(bar._callmethod('f'), 'f()')
2863        self.assertEqual(bar._callmethod('_h'), '_h()')
2864
2865        self.assertEqual(list(baz), [i*i for i in range(10)])
2866
2867
2868#
2869# Test of connecting to a remote server and using xmlrpclib for serialization
2870#
2871
2872_queue = pyqueue.Queue()
2873def get_queue():
2874    return _queue
2875
2876class QueueManager(BaseManager):
2877    '''manager class used by server process'''
2878QueueManager.register('get_queue', callable=get_queue)
2879
2880class QueueManager2(BaseManager):
2881    '''manager class which specifies the same interface as QueueManager'''
2882QueueManager2.register('get_queue')
2883
2884
2885SERIALIZER = 'xmlrpclib'
2886
2887class _TestRemoteManager(BaseTestCase):
2888
2889    ALLOWED_TYPES = ('manager',)
2890    values = ['hello world', None, True, 2.25,
2891              'hall\xe5 v\xe4rlden',
2892              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2893              b'hall\xe5 v\xe4rlden',
2894             ]
2895    result = values[:]
2896
2897    @classmethod
2898    def _putter(cls, address, authkey):
2899        manager = QueueManager2(
2900            address=address, authkey=authkey, serializer=SERIALIZER
2901            )
2902        manager.connect()
2903        queue = manager.get_queue()
2904        # Note that xmlrpclib will deserialize object as a list not a tuple
2905        queue.put(tuple(cls.values))
2906
2907    def test_remote(self):
2908        authkey = os.urandom(32)
2909
2910        manager = QueueManager(
2911            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
2912            )
2913        manager.start()
2914        self.addCleanup(manager.shutdown)
2915
2916        p = self.Process(target=self._putter, args=(manager.address, authkey))
2917        p.daemon = True
2918        p.start()
2919
2920        manager2 = QueueManager2(
2921            address=manager.address, authkey=authkey, serializer=SERIALIZER
2922            )
2923        manager2.connect()
2924        queue = manager2.get_queue()
2925
2926        self.assertEqual(queue.get(), self.result)
2927
2928        # Because we are using xmlrpclib for serialization instead of
2929        # pickle this will cause a serialization error.
2930        self.assertRaises(Exception, queue.put, time.sleep)
2931
2932        # Make queue finalizer run before the server is stopped
2933        del queue
2934
2935class _TestManagerRestart(BaseTestCase):
2936
2937    @classmethod
2938    def _putter(cls, address, authkey):
2939        manager = QueueManager(
2940            address=address, authkey=authkey, serializer=SERIALIZER)
2941        manager.connect()
2942        queue = manager.get_queue()
2943        queue.put('hello world')
2944
2945    def test_rapid_restart(self):
2946        authkey = os.urandom(32)
2947        manager = QueueManager(
2948            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2949        try:
2950            srvr = manager.get_server()
2951            addr = srvr.address
2952            # Close the connection.Listener socket which gets opened as a part
2953            # of manager.get_server(). It's not needed for the test.
2954            srvr.listener.close()
2955            manager.start()
2956
2957            p = self.Process(target=self._putter, args=(manager.address, authkey))
2958            p.start()
2959            p.join()
2960            queue = manager.get_queue()
2961            self.assertEqual(queue.get(), 'hello world')
2962            del queue
2963        finally:
2964            if hasattr(manager, "shutdown"):
2965                manager.shutdown()
2966
2967        manager = QueueManager(
2968            address=addr, authkey=authkey, serializer=SERIALIZER)
2969        try:
2970            manager.start()
2971            self.addCleanup(manager.shutdown)
2972        except OSError as e:
2973            if e.errno != errno.EADDRINUSE:
2974                raise
2975            # Retry after some time, in case the old socket was lingering
2976            # (sporadic failure on buildbots)
2977            time.sleep(1.0)
2978            manager = QueueManager(
2979                address=addr, authkey=authkey, serializer=SERIALIZER)
2980            if hasattr(manager, "shutdown"):
2981                self.addCleanup(manager.shutdown)
2982
2983#
2984#
2985#
2986
2987SENTINEL = latin('')
2988
2989class _TestConnection(BaseTestCase):
2990
2991    ALLOWED_TYPES = ('processes', 'threads')
2992
2993    @classmethod
2994    def _echo(cls, conn):
2995        for msg in iter(conn.recv_bytes, SENTINEL):
2996            conn.send_bytes(msg)
2997        conn.close()
2998
2999    def test_connection(self):
3000        conn, child_conn = self.Pipe()
3001
3002        p = self.Process(target=self._echo, args=(child_conn,))
3003        p.daemon = True
3004        p.start()
3005
3006        seq = [1, 2.25, None]
3007        msg = latin('hello world')
3008        longmsg = msg * 10
3009        arr = array.array('i', list(range(4)))
3010
3011        if self.TYPE == 'processes':
3012            self.assertEqual(type(conn.fileno()), int)
3013
3014        self.assertEqual(conn.send(seq), None)
3015        self.assertEqual(conn.recv(), seq)
3016
3017        self.assertEqual(conn.send_bytes(msg), None)
3018        self.assertEqual(conn.recv_bytes(), msg)
3019
3020        if self.TYPE == 'processes':
3021            buffer = array.array('i', [0]*10)
3022            expected = list(arr) + [0] * (10 - len(arr))
3023            self.assertEqual(conn.send_bytes(arr), None)
3024            self.assertEqual(conn.recv_bytes_into(buffer),
3025                             len(arr) * buffer.itemsize)
3026            self.assertEqual(list(buffer), expected)
3027
3028            buffer = array.array('i', [0]*10)
3029            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3030            self.assertEqual(conn.send_bytes(arr), None)
3031            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3032                             len(arr) * buffer.itemsize)
3033            self.assertEqual(list(buffer), expected)
3034
3035            buffer = bytearray(latin(' ' * 40))
3036            self.assertEqual(conn.send_bytes(longmsg), None)
3037            try:
3038                res = conn.recv_bytes_into(buffer)
3039            except multiprocessing.BufferTooShort as e:
3040                self.assertEqual(e.args, (longmsg,))
3041            else:
3042                self.fail('expected BufferTooShort, got %s' % res)
3043
3044        poll = TimingWrapper(conn.poll)
3045
3046        self.assertEqual(poll(), False)
3047        self.assertTimingAlmostEqual(poll.elapsed, 0)
3048
3049        self.assertEqual(poll(-1), False)
3050        self.assertTimingAlmostEqual(poll.elapsed, 0)
3051
3052        self.assertEqual(poll(TIMEOUT1), False)
3053        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3054
3055        conn.send(None)
3056        time.sleep(.1)
3057
3058        self.assertEqual(poll(TIMEOUT1), True)
3059        self.assertTimingAlmostEqual(poll.elapsed, 0)
3060
3061        self.assertEqual(conn.recv(), None)
3062
3063        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
3064        conn.send_bytes(really_big_msg)
3065        self.assertEqual(conn.recv_bytes(), really_big_msg)
3066
3067        conn.send_bytes(SENTINEL)                          # tell child to quit
3068        child_conn.close()
3069
3070        if self.TYPE == 'processes':
3071            self.assertEqual(conn.readable, True)
3072            self.assertEqual(conn.writable, True)
3073            self.assertRaises(EOFError, conn.recv)
3074            self.assertRaises(EOFError, conn.recv_bytes)
3075
3076        p.join()
3077
3078    def test_duplex_false(self):
3079        reader, writer = self.Pipe(duplex=False)
3080        self.assertEqual(writer.send(1), None)
3081        self.assertEqual(reader.recv(), 1)
3082        if self.TYPE == 'processes':
3083            self.assertEqual(reader.readable, True)
3084            self.assertEqual(reader.writable, False)
3085            self.assertEqual(writer.readable, False)
3086            self.assertEqual(writer.writable, True)
3087            self.assertRaises(OSError, reader.send, 2)
3088            self.assertRaises(OSError, writer.recv)
3089            self.assertRaises(OSError, writer.poll)
3090
3091    def test_spawn_close(self):
3092        # We test that a pipe connection can be closed by parent
3093        # process immediately after child is spawned.  On Windows this
3094        # would have sometimes failed on old versions because
3095        # child_conn would be closed before the child got a chance to
3096        # duplicate it.
3097        conn, child_conn = self.Pipe()
3098
3099        p = self.Process(target=self._echo, args=(child_conn,))
3100        p.daemon = True
3101        p.start()
3102        child_conn.close()    # this might complete before child initializes
3103
3104        msg = latin('hello')
3105        conn.send_bytes(msg)
3106        self.assertEqual(conn.recv_bytes(), msg)
3107
3108        conn.send_bytes(SENTINEL)
3109        conn.close()
3110        p.join()
3111
3112    def test_sendbytes(self):
3113        if self.TYPE != 'processes':
3114            self.skipTest('test not appropriate for {}'.format(self.TYPE))
3115
3116        msg = latin('abcdefghijklmnopqrstuvwxyz')
3117        a, b = self.Pipe()
3118
3119        a.send_bytes(msg)
3120        self.assertEqual(b.recv_bytes(), msg)
3121
3122        a.send_bytes(msg, 5)
3123        self.assertEqual(b.recv_bytes(), msg[5:])
3124
3125        a.send_bytes(msg, 7, 8)
3126        self.assertEqual(b.recv_bytes(), msg[7:7+8])
3127
3128        a.send_bytes(msg, 26)
3129        self.assertEqual(b.recv_bytes(), latin(''))
3130
3131        a.send_bytes(msg, 26, 0)
3132        self.assertEqual(b.recv_bytes(), latin(''))
3133
3134        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3135
3136        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3137
3138        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3139
3140        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3141
3142        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3143
3144    @classmethod
3145    def _is_fd_assigned(cls, fd):
3146        try:
3147            os.fstat(fd)
3148        except OSError as e:
3149            if e.errno == errno.EBADF:
3150                return False
3151            raise
3152        else:
3153            return True
3154
3155    @classmethod
3156    def _writefd(cls, conn, data, create_dummy_fds=False):
3157        if create_dummy_fds:
3158            for i in range(0, 256):
3159                if not cls._is_fd_assigned(i):
3160                    os.dup2(conn.fileno(), i)
3161        fd = reduction.recv_handle(conn)
3162        if msvcrt:
3163            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3164        os.write(fd, data)
3165        os.close(fd)
3166
3167    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3168    def test_fd_transfer(self):
3169        if self.TYPE != 'processes':
3170            self.skipTest("only makes sense with processes")
3171        conn, child_conn = self.Pipe(duplex=True)
3172
3173        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3174        p.daemon = True
3175        p.start()
3176        self.addCleanup(test.support.unlink, test.support.TESTFN)
3177        with open(test.support.TESTFN, "wb") as f:
3178            fd = f.fileno()
3179            if msvcrt:
3180                fd = msvcrt.get_osfhandle(fd)
3181            reduction.send_handle(conn, fd, p.pid)
3182        p.join()
3183        with open(test.support.TESTFN, "rb") as f:
3184            self.assertEqual(f.read(), b"foo")
3185
3186    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3187    @unittest.skipIf(sys.platform == "win32",
3188                     "test semantics don't make sense on Windows")
3189    @unittest.skipIf(MAXFD <= 256,
3190                     "largest assignable fd number is too small")
3191    @unittest.skipUnless(hasattr(os, "dup2"),
3192                         "test needs os.dup2()")
3193    def test_large_fd_transfer(self):
3194        # With fd > 256 (issue #11657)
3195        if self.TYPE != 'processes':
3196            self.skipTest("only makes sense with processes")
3197        conn, child_conn = self.Pipe(duplex=True)
3198
3199        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3200        p.daemon = True
3201        p.start()
3202        self.addCleanup(test.support.unlink, test.support.TESTFN)
3203        with open(test.support.TESTFN, "wb") as f:
3204            fd = f.fileno()
3205            for newfd in range(256, MAXFD):
3206                if not self._is_fd_assigned(newfd):
3207                    break
3208            else:
3209                self.fail("could not find an unassigned large file descriptor")
3210            os.dup2(fd, newfd)
3211            try:
3212                reduction.send_handle(conn, newfd, p.pid)
3213            finally:
3214                os.close(newfd)
3215        p.join()
3216        with open(test.support.TESTFN, "rb") as f:
3217            self.assertEqual(f.read(), b"bar")
3218
3219    @classmethod
3220    def _send_data_without_fd(self, conn):
3221        os.write(conn.fileno(), b"\0")
3222
3223    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3224    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3225    def test_missing_fd_transfer(self):
3226        # Check that exception is raised when received data is not
3227        # accompanied by a file descriptor in ancillary data.
3228        if self.TYPE != 'processes':
3229            self.skipTest("only makes sense with processes")
3230        conn, child_conn = self.Pipe(duplex=True)
3231
3232        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3233        p.daemon = True
3234        p.start()
3235        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3236        p.join()
3237
3238    def test_context(self):
3239        a, b = self.Pipe()
3240
3241        with a, b:
3242            a.send(1729)
3243            self.assertEqual(b.recv(), 1729)
3244            if self.TYPE == 'processes':
3245                self.assertFalse(a.closed)
3246                self.assertFalse(b.closed)
3247
3248        if self.TYPE == 'processes':
3249            self.assertTrue(a.closed)
3250            self.assertTrue(b.closed)
3251            self.assertRaises(OSError, a.recv)
3252            self.assertRaises(OSError, b.recv)
3253
3254class _TestListener(BaseTestCase):
3255
3256    ALLOWED_TYPES = ('processes',)
3257
3258    def test_multiple_bind(self):
3259        for family in self.connection.families:
3260            l = self.connection.Listener(family=family)
3261            self.addCleanup(l.close)
3262            self.assertRaises(OSError, self.connection.Listener,
3263                              l.address, family)
3264
3265    def test_context(self):
3266        with self.connection.Listener() as l:
3267            with self.connection.Client(l.address) as c:
3268                with l.accept() as d:
3269                    c.send(1729)
3270                    self.assertEqual(d.recv(), 1729)
3271
3272        if self.TYPE == 'processes':
3273            self.assertRaises(OSError, l.accept)
3274
3275class _TestListenerClient(BaseTestCase):
3276
3277    ALLOWED_TYPES = ('processes', 'threads')
3278
3279    @classmethod
3280    def _test(cls, address):
3281        conn = cls.connection.Client(address)
3282        conn.send('hello')
3283        conn.close()
3284
3285    def test_listener_client(self):
3286        for family in self.connection.families:
3287            l = self.connection.Listener(family=family)
3288            p = self.Process(target=self._test, args=(l.address,))
3289            p.daemon = True
3290            p.start()
3291            conn = l.accept()
3292            self.assertEqual(conn.recv(), 'hello')
3293            p.join()
3294            l.close()
3295
3296    def test_issue14725(self):
3297        l = self.connection.Listener()
3298        p = self.Process(target=self._test, args=(l.address,))
3299        p.daemon = True
3300        p.start()
3301        time.sleep(1)
3302        # On Windows the client process should by now have connected,
3303        # written data and closed the pipe handle by now.  This causes
3304        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3305        # 14725.
3306        conn = l.accept()
3307        self.assertEqual(conn.recv(), 'hello')
3308        conn.close()
3309        p.join()
3310        l.close()
3311
3312    def test_issue16955(self):
3313        for fam in self.connection.families:
3314            l = self.connection.Listener(family=fam)
3315            c = self.connection.Client(l.address)
3316            a = l.accept()
3317            a.send_bytes(b"hello")
3318            self.assertTrue(c.poll(1))
3319            a.close()
3320            c.close()
3321            l.close()
3322
3323class _TestPoll(BaseTestCase):
3324
3325    ALLOWED_TYPES = ('processes', 'threads')
3326
3327    def test_empty_string(self):
3328        a, b = self.Pipe()
3329        self.assertEqual(a.poll(), False)
3330        b.send_bytes(b'')
3331        self.assertEqual(a.poll(), True)
3332        self.assertEqual(a.poll(), True)
3333
3334    @classmethod
3335    def _child_strings(cls, conn, strings):
3336        for s in strings:
3337            time.sleep(0.1)
3338            conn.send_bytes(s)
3339        conn.close()
3340
3341    def test_strings(self):
3342        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3343        a, b = self.Pipe()
3344        p = self.Process(target=self._child_strings, args=(b, strings))
3345        p.start()
3346
3347        for s in strings:
3348            for i in range(200):
3349                if a.poll(0.01):
3350                    break
3351            x = a.recv_bytes()
3352            self.assertEqual(s, x)
3353
3354        p.join()
3355
3356    @classmethod
3357    def _child_boundaries(cls, r):
3358        # Polling may "pull" a message in to the child process, but we
3359        # don't want it to pull only part of a message, as that would
3360        # corrupt the pipe for any other processes which might later
3361        # read from it.
3362        r.poll(5)
3363
3364    def test_boundaries(self):
3365        r, w = self.Pipe(False)
3366        p = self.Process(target=self._child_boundaries, args=(r,))
3367        p.start()
3368        time.sleep(2)
3369        L = [b"first", b"second"]
3370        for obj in L:
3371            w.send_bytes(obj)
3372        w.close()
3373        p.join()
3374        self.assertIn(r.recv_bytes(), L)
3375
3376    @classmethod
3377    def _child_dont_merge(cls, b):
3378        b.send_bytes(b'a')
3379        b.send_bytes(b'b')
3380        b.send_bytes(b'cd')
3381
3382    def test_dont_merge(self):
3383        a, b = self.Pipe()
3384        self.assertEqual(a.poll(0.0), False)
3385        self.assertEqual(a.poll(0.1), False)
3386
3387        p = self.Process(target=self._child_dont_merge, args=(b,))
3388        p.start()
3389
3390        self.assertEqual(a.recv_bytes(), b'a')
3391        self.assertEqual(a.poll(1.0), True)
3392        self.assertEqual(a.poll(1.0), True)
3393        self.assertEqual(a.recv_bytes(), b'b')
3394        self.assertEqual(a.poll(1.0), True)
3395        self.assertEqual(a.poll(1.0), True)
3396        self.assertEqual(a.poll(0.0), True)
3397        self.assertEqual(a.recv_bytes(), b'cd')
3398
3399        p.join()
3400
3401#
3402# Test of sending connection and socket objects between processes
3403#
3404
3405@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3406class _TestPicklingConnections(BaseTestCase):
3407
3408    ALLOWED_TYPES = ('processes',)
3409
3410    @classmethod
3411    def tearDownClass(cls):
3412        from multiprocessing import resource_sharer
3413        resource_sharer.stop(timeout=TIMEOUT)
3414
3415    @classmethod
3416    def _listener(cls, conn, families):
3417        for fam in families:
3418            l = cls.connection.Listener(family=fam)
3419            conn.send(l.address)
3420            new_conn = l.accept()
3421            conn.send(new_conn)
3422            new_conn.close()
3423            l.close()
3424
3425        l = socket.create_server((test.support.HOST, 0))
3426        conn.send(l.getsockname())
3427        new_conn, addr = l.accept()
3428        conn.send(new_conn)
3429        new_conn.close()
3430        l.close()
3431
3432        conn.recv()
3433
3434    @classmethod
3435    def _remote(cls, conn):
3436        for (address, msg) in iter(conn.recv, None):
3437            client = cls.connection.Client(address)
3438            client.send(msg.upper())
3439            client.close()
3440
3441        address, msg = conn.recv()
3442        client = socket.socket()
3443        client.connect(address)
3444        client.sendall(msg.upper())
3445        client.close()
3446
3447        conn.close()
3448
3449    def test_pickling(self):
3450        families = self.connection.families
3451
3452        lconn, lconn0 = self.Pipe()
3453        lp = self.Process(target=self._listener, args=(lconn0, families))
3454        lp.daemon = True
3455        lp.start()
3456        lconn0.close()
3457
3458        rconn, rconn0 = self.Pipe()
3459        rp = self.Process(target=self._remote, args=(rconn0,))
3460        rp.daemon = True
3461        rp.start()
3462        rconn0.close()
3463
3464        for fam in families:
3465            msg = ('This connection uses family %s' % fam).encode('ascii')
3466            address = lconn.recv()
3467            rconn.send((address, msg))
3468            new_conn = lconn.recv()
3469            self.assertEqual(new_conn.recv(), msg.upper())
3470
3471        rconn.send(None)
3472
3473        msg = latin('This connection uses a normal socket')
3474        address = lconn.recv()
3475        rconn.send((address, msg))
3476        new_conn = lconn.recv()
3477        buf = []
3478        while True:
3479            s = new_conn.recv(100)
3480            if not s:
3481                break
3482            buf.append(s)
3483        buf = b''.join(buf)
3484        self.assertEqual(buf, msg.upper())
3485        new_conn.close()
3486
3487        lconn.send(None)
3488
3489        rconn.close()
3490        lconn.close()
3491
3492        lp.join()
3493        rp.join()
3494
3495    @classmethod
3496    def child_access(cls, conn):
3497        w = conn.recv()
3498        w.send('all is well')
3499        w.close()
3500
3501        r = conn.recv()
3502        msg = r.recv()
3503        conn.send(msg*2)
3504
3505        conn.close()
3506
3507    def test_access(self):
3508        # On Windows, if we do not specify a destination pid when
3509        # using DupHandle then we need to be careful to use the
3510        # correct access flags for DuplicateHandle(), or else
3511        # DupHandle.detach() will raise PermissionError.  For example,
3512        # for a read only pipe handle we should use
3513        # access=FILE_GENERIC_READ.  (Unfortunately
3514        # DUPLICATE_SAME_ACCESS does not work.)
3515        conn, child_conn = self.Pipe()
3516        p = self.Process(target=self.child_access, args=(child_conn,))
3517        p.daemon = True
3518        p.start()
3519        child_conn.close()
3520
3521        r, w = self.Pipe(duplex=False)
3522        conn.send(w)
3523        w.close()
3524        self.assertEqual(r.recv(), 'all is well')
3525        r.close()
3526
3527        r, w = self.Pipe(duplex=False)
3528        conn.send(r)
3529        r.close()
3530        w.send('foobar')
3531        w.close()
3532        self.assertEqual(conn.recv(), 'foobar'*2)
3533
3534        p.join()
3535
3536#
3537#
3538#
3539
3540class _TestHeap(BaseTestCase):
3541
3542    ALLOWED_TYPES = ('processes',)
3543
3544    def setUp(self):
3545        super().setUp()
3546        # Make pristine heap for these tests
3547        self.old_heap = multiprocessing.heap.BufferWrapper._heap
3548        multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3549
3550    def tearDown(self):
3551        multiprocessing.heap.BufferWrapper._heap = self.old_heap
3552        super().tearDown()
3553
3554    def test_heap(self):
3555        iterations = 5000
3556        maxblocks = 50
3557        blocks = []
3558
3559        # get the heap object
3560        heap = multiprocessing.heap.BufferWrapper._heap
3561        heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3562
3563        # create and destroy lots of blocks of different sizes
3564        for i in range(iterations):
3565            size = int(random.lognormvariate(0, 1) * 1000)
3566            b = multiprocessing.heap.BufferWrapper(size)
3567            blocks.append(b)
3568            if len(blocks) > maxblocks:
3569                i = random.randrange(maxblocks)
3570                del blocks[i]
3571            del b
3572
3573        # verify the state of the heap
3574        with heap._lock:
3575            all = []
3576            free = 0
3577            occupied = 0
3578            for L in list(heap._len_to_seq.values()):
3579                # count all free blocks in arenas
3580                for arena, start, stop in L:
3581                    all.append((heap._arenas.index(arena), start, stop,
3582                                stop-start, 'free'))
3583                    free += (stop-start)
3584            for arena, arena_blocks in heap._allocated_blocks.items():
3585                # count all allocated blocks in arenas
3586                for start, stop in arena_blocks:
3587                    all.append((heap._arenas.index(arena), start, stop,
3588                                stop-start, 'occupied'))
3589                    occupied += (stop-start)
3590
3591            self.assertEqual(free + occupied,
3592                             sum(arena.size for arena in heap._arenas))
3593
3594            all.sort()
3595
3596            for i in range(len(all)-1):
3597                (arena, start, stop) = all[i][:3]
3598                (narena, nstart, nstop) = all[i+1][:3]
3599                if arena != narena:
3600                    # Two different arenas
3601                    self.assertEqual(stop, heap._arenas[arena].size)  # last block
3602                    self.assertEqual(nstart, 0)         # first block
3603                else:
3604                    # Same arena: two adjacent blocks
3605                    self.assertEqual(stop, nstart)
3606
3607        # test free'ing all blocks
3608        random.shuffle(blocks)
3609        while blocks:
3610            blocks.pop()
3611
3612        self.assertEqual(heap._n_frees, heap._n_mallocs)
3613        self.assertEqual(len(heap._pending_free_blocks), 0)
3614        self.assertEqual(len(heap._arenas), 0)
3615        self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3616        self.assertEqual(len(heap._len_to_seq), 0)
3617
3618    def test_free_from_gc(self):
3619        # Check that freeing of blocks by the garbage collector doesn't deadlock
3620        # (issue #12352).
3621        # Make sure the GC is enabled, and set lower collection thresholds to
3622        # make collections more frequent (and increase the probability of
3623        # deadlock).
3624        if not gc.isenabled():
3625            gc.enable()
3626            self.addCleanup(gc.disable)
3627        thresholds = gc.get_threshold()
3628        self.addCleanup(gc.set_threshold, *thresholds)
3629        gc.set_threshold(10)
3630
3631        # perform numerous block allocations, with cyclic references to make
3632        # sure objects are collected asynchronously by the gc
3633        for i in range(5000):
3634            a = multiprocessing.heap.BufferWrapper(1)
3635            b = multiprocessing.heap.BufferWrapper(1)
3636            # circular references
3637            a.buddy = b
3638            b.buddy = a
3639
3640#
3641#
3642#
3643
3644class _Foo(Structure):
3645    _fields_ = [
3646        ('x', c_int),
3647        ('y', c_double),
3648        ('z', c_longlong,)
3649        ]
3650
3651class _TestSharedCTypes(BaseTestCase):
3652
3653    ALLOWED_TYPES = ('processes',)
3654
3655    def setUp(self):
3656        if not HAS_SHAREDCTYPES:
3657            self.skipTest("requires multiprocessing.sharedctypes")
3658
3659    @classmethod
3660    def _double(cls, x, y, z, foo, arr, string):
3661        x.value *= 2
3662        y.value *= 2
3663        z.value *= 2
3664        foo.x *= 2
3665        foo.y *= 2
3666        string.value *= 2
3667        for i in range(len(arr)):
3668            arr[i] *= 2
3669
3670    def test_sharedctypes(self, lock=False):
3671        x = Value('i', 7, lock=lock)
3672        y = Value(c_double, 1.0/3.0, lock=lock)
3673        z = Value(c_longlong, 2 ** 33, lock=lock)
3674        foo = Value(_Foo, 3, 2, lock=lock)
3675        arr = self.Array('d', list(range(10)), lock=lock)
3676        string = self.Array('c', 20, lock=lock)
3677        string.value = latin('hello')
3678
3679        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
3680        p.daemon = True
3681        p.start()
3682        p.join()
3683
3684        self.assertEqual(x.value, 14)
3685        self.assertAlmostEqual(y.value, 2.0/3.0)
3686        self.assertEqual(z.value, 2 ** 34)
3687        self.assertEqual(foo.x, 6)
3688        self.assertAlmostEqual(foo.y, 4.0)
3689        for i in range(10):
3690            self.assertAlmostEqual(arr[i], i*2)
3691        self.assertEqual(string.value, latin('hellohello'))
3692
3693    def test_synchronize(self):
3694        self.test_sharedctypes(lock=True)
3695
3696    def test_copy(self):
3697        foo = _Foo(2, 5.0, 2 ** 33)
3698        bar = copy(foo)
3699        foo.x = 0
3700        foo.y = 0
3701        foo.z = 0
3702        self.assertEqual(bar.x, 2)
3703        self.assertAlmostEqual(bar.y, 5.0)
3704        self.assertEqual(bar.z, 2 ** 33)
3705
3706
3707@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3708class _TestSharedMemory(BaseTestCase):
3709
3710    ALLOWED_TYPES = ('processes',)
3711
3712    @staticmethod
3713    def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3714        if isinstance(shmem_name_or_obj, str):
3715            local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3716        else:
3717            local_sms = shmem_name_or_obj
3718        local_sms.buf[:len(binary_data)] = binary_data
3719        local_sms.close()
3720
3721    def test_shared_memory_basics(self):
3722        sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512)
3723        self.addCleanup(sms.unlink)
3724
3725        # Verify attributes are readable.
3726        self.assertEqual(sms.name, 'test01_tsmb')
3727        self.assertGreaterEqual(sms.size, 512)
3728        self.assertGreaterEqual(len(sms.buf), sms.size)
3729
3730        # Modify contents of shared memory segment through memoryview.
3731        sms.buf[0] = 42
3732        self.assertEqual(sms.buf[0], 42)
3733
3734        # Attach to existing shared memory segment.
3735        also_sms = shared_memory.SharedMemory('test01_tsmb')
3736        self.assertEqual(also_sms.buf[0], 42)
3737        also_sms.close()
3738
3739        # Attach to existing shared memory segment but specify a new size.
3740        same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
3741        self.assertLess(same_sms.size, 20*sms.size)  # Size was ignored.
3742        same_sms.close()
3743
3744        if shared_memory._USE_POSIX:
3745            # Posix Shared Memory can only be unlinked once.  Here we
3746            # test an implementation detail that is not observed across
3747            # all supported platforms (since WindowsNamedSharedMemory
3748            # manages unlinking on its own and unlink() does nothing).
3749            # True release of shared memory segment does not necessarily
3750            # happen until process exits, depending on the OS platform.
3751            with self.assertRaises(FileNotFoundError):
3752                sms_uno = shared_memory.SharedMemory(
3753                    'test01_dblunlink',
3754                    create=True,
3755                    size=5000
3756                )
3757
3758                try:
3759                    self.assertGreaterEqual(sms_uno.size, 5000)
3760
3761                    sms_duo = shared_memory.SharedMemory('test01_dblunlink')
3762                    sms_duo.unlink()  # First shm_unlink() call.
3763                    sms_duo.close()
3764                    sms_uno.close()
3765
3766                finally:
3767                    sms_uno.unlink()  # A second shm_unlink() call is bad.
3768
3769        with self.assertRaises(FileExistsError):
3770            # Attempting to create a new shared memory segment with a
3771            # name that is already in use triggers an exception.
3772            there_can_only_be_one_sms = shared_memory.SharedMemory(
3773                'test01_tsmb',
3774                create=True,
3775                size=512
3776            )
3777
3778        if shared_memory._USE_POSIX:
3779            # Requesting creation of a shared memory segment with the option
3780            # to attach to an existing segment, if that name is currently in
3781            # use, should not trigger an exception.
3782            # Note:  Using a smaller size could possibly cause truncation of
3783            # the existing segment but is OS platform dependent.  In the
3784            # case of MacOS/darwin, requesting a smaller size is disallowed.
3785            class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3786                _flags = os.O_CREAT | os.O_RDWR
3787            ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb')
3788            self.assertEqual(ok_if_exists_sms.size, sms.size)
3789            ok_if_exists_sms.close()
3790
3791        # Attempting to attach to an existing shared memory segment when
3792        # no segment exists with the supplied name triggers an exception.
3793        with self.assertRaises(FileNotFoundError):
3794            nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3795            nonexisting_sms.unlink()  # Error should occur on prior line.
3796
3797        sms.close()
3798
3799    def test_shared_memory_across_processes(self):
3800        sms = shared_memory.SharedMemory('test02_tsmap', True, size=512)
3801        self.addCleanup(sms.unlink)
3802
3803        # Verify remote attachment to existing block by name is working.
3804        p = self.Process(
3805            target=self._attach_existing_shmem_then_write,
3806            args=(sms.name, b'howdy')
3807        )
3808        p.daemon = True
3809        p.start()
3810        p.join()
3811        self.assertEqual(bytes(sms.buf[:5]), b'howdy')
3812
3813        # Verify pickling of SharedMemory instance also works.
3814        p = self.Process(
3815            target=self._attach_existing_shmem_then_write,
3816            args=(sms, b'HELLO')
3817        )
3818        p.daemon = True
3819        p.start()
3820        p.join()
3821        self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
3822
3823        sms.close()
3824
3825    @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
3826    def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
3827        # bpo-36368: protect SharedMemoryManager server process from
3828        # KeyboardInterrupt signals.
3829        smm = multiprocessing.managers.SharedMemoryManager()
3830        smm.start()
3831
3832        # make sure the manager works properly at the beginning
3833        sl = smm.ShareableList(range(10))
3834
3835        # the manager's server should ignore KeyboardInterrupt signals, and
3836        # maintain its connection with the current process, and success when
3837        # asked to deliver memory segments.
3838        os.kill(smm._process.pid, signal.SIGINT)
3839
3840        sl2 = smm.ShareableList(range(10))
3841
3842        # test that the custom signal handler registered in the Manager does
3843        # not affect signal handling in the parent process.
3844        with self.assertRaises(KeyboardInterrupt):
3845            os.kill(os.getpid(), signal.SIGINT)
3846
3847        smm.shutdown()
3848
3849    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
3850    def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
3851        # bpo-36867: test that a SharedMemoryManager uses the
3852        # same resource_tracker process as its parent.
3853        cmd = '''if 1:
3854            from multiprocessing.managers import SharedMemoryManager
3855
3856
3857            smm = SharedMemoryManager()
3858            smm.start()
3859            sl = smm.ShareableList(range(10))
3860            smm.shutdown()
3861        '''
3862        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
3863
3864        # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
3865        # resource_tracker process as its parent would make the parent's
3866        # tracker complain about sl being leaked even though smm.shutdown()
3867        # properly released sl.
3868        self.assertFalse(err)
3869
3870    def test_shared_memory_SharedMemoryManager_basics(self):
3871        smm1 = multiprocessing.managers.SharedMemoryManager()
3872        with self.assertRaises(ValueError):
3873            smm1.SharedMemory(size=9)  # Fails if SharedMemoryServer not started
3874        smm1.start()
3875        lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
3876        lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
3877        doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
3878        self.assertEqual(len(doppleganger_list0), 5)
3879        doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
3880        self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
3881        held_name = lom[0].name
3882        smm1.shutdown()
3883        if sys.platform != "win32":
3884            # Calls to unlink() have no effect on Windows platform; shared
3885            # memory will only be released once final process exits.
3886            with self.assertRaises(FileNotFoundError):
3887                # No longer there to be attached to again.
3888                absent_shm = shared_memory.SharedMemory(name=held_name)
3889
3890        with multiprocessing.managers.SharedMemoryManager() as smm2:
3891            sl = smm2.ShareableList("howdy")
3892            shm = smm2.SharedMemory(size=128)
3893            held_name = sl.shm.name
3894        if sys.platform != "win32":
3895            with self.assertRaises(FileNotFoundError):
3896                # No longer there to be attached to again.
3897                absent_sl = shared_memory.ShareableList(name=held_name)
3898
3899
3900    def test_shared_memory_ShareableList_basics(self):
3901        sl = shared_memory.ShareableList(
3902            ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
3903        )
3904        self.addCleanup(sl.shm.unlink)
3905
3906        # Verify attributes are readable.
3907        self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
3908
3909        # Exercise len().
3910        self.assertEqual(len(sl), 7)
3911
3912        # Exercise index().
3913        with warnings.catch_warnings():
3914            # Suppress BytesWarning when comparing against b'HoWdY'.
3915            warnings.simplefilter('ignore')
3916            with self.assertRaises(ValueError):
3917                sl.index('100')
3918            self.assertEqual(sl.index(100), 3)
3919
3920        # Exercise retrieving individual values.
3921        self.assertEqual(sl[0], 'howdy')
3922        self.assertEqual(sl[-2], True)
3923
3924        # Exercise iterability.
3925        self.assertEqual(
3926            tuple(sl),
3927            ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
3928        )
3929
3930        # Exercise modifying individual values.
3931        sl[3] = 42
3932        self.assertEqual(sl[3], 42)
3933        sl[4] = 'some'  # Change type at a given position.
3934        self.assertEqual(sl[4], 'some')
3935        self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
3936        with self.assertRaises(ValueError):
3937            sl[4] = 'far too many'  # Exceeds available storage.
3938        self.assertEqual(sl[4], 'some')
3939
3940        # Exercise count().
3941        with warnings.catch_warnings():
3942            # Suppress BytesWarning when comparing against b'HoWdY'.
3943            warnings.simplefilter('ignore')
3944            self.assertEqual(sl.count(42), 2)
3945            self.assertEqual(sl.count(b'HoWdY'), 1)
3946            self.assertEqual(sl.count(b'adios'), 0)
3947
3948        # Exercise creating a duplicate.
3949        sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate')
3950        try:
3951            self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
3952            self.assertEqual('test03_duplicate', sl_copy.shm.name)
3953            self.assertEqual(list(sl), list(sl_copy))
3954            self.assertEqual(sl.format, sl_copy.format)
3955            sl_copy[-1] = 77
3956            self.assertEqual(sl_copy[-1], 77)
3957            self.assertNotEqual(sl[-1], 77)
3958            sl_copy.shm.close()
3959        finally:
3960            sl_copy.shm.unlink()
3961
3962        # Obtain a second handle on the same ShareableList.
3963        sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
3964        self.assertEqual(sl.shm.name, sl_tethered.shm.name)
3965        sl_tethered[-1] = 880
3966        self.assertEqual(sl[-1], 880)
3967        sl_tethered.shm.close()
3968
3969        sl.shm.close()
3970
3971        # Exercise creating an empty ShareableList.
3972        empty_sl = shared_memory.ShareableList()
3973        try:
3974            self.assertEqual(len(empty_sl), 0)
3975            self.assertEqual(empty_sl.format, '')
3976            self.assertEqual(empty_sl.count('any'), 0)
3977            with self.assertRaises(ValueError):
3978                empty_sl.index(None)
3979            empty_sl.shm.close()
3980        finally:
3981            empty_sl.shm.unlink()
3982
3983    def test_shared_memory_ShareableList_pickling(self):
3984        sl = shared_memory.ShareableList(range(10))
3985        self.addCleanup(sl.shm.unlink)
3986
3987        serialized_sl = pickle.dumps(sl)
3988        deserialized_sl = pickle.loads(serialized_sl)
3989        self.assertTrue(
3990            isinstance(deserialized_sl, shared_memory.ShareableList)
3991        )
3992        self.assertTrue(deserialized_sl[-1], 9)
3993        self.assertFalse(sl is deserialized_sl)
3994        deserialized_sl[4] = "changed"
3995        self.assertEqual(sl[4], "changed")
3996
3997        # Verify data is not being put into the pickled representation.
3998        name = 'a' * len(sl.shm.name)
3999        larger_sl = shared_memory.ShareableList(range(400))
4000        self.addCleanup(larger_sl.shm.unlink)
4001        serialized_larger_sl = pickle.dumps(larger_sl)
4002        self.assertTrue(len(serialized_sl) == len(serialized_larger_sl))
4003        larger_sl.shm.close()
4004
4005        deserialized_sl.shm.close()
4006        sl.shm.close()
4007
4008    def test_shared_memory_cleaned_after_process_termination(self):
4009        cmd = '''if 1:
4010            import os, time, sys
4011            from multiprocessing import shared_memory
4012
4013            # Create a shared_memory segment, and send the segment name
4014            sm = shared_memory.SharedMemory(create=True, size=10)
4015            sys.stdout.write(sm.name + '\\n')
4016            sys.stdout.flush()
4017            time.sleep(100)
4018        '''
4019        with subprocess.Popen([sys.executable, '-E', '-c', cmd],
4020                              stdout=subprocess.PIPE,
4021                              stderr=subprocess.PIPE) as p:
4022            name = p.stdout.readline().strip().decode()
4023
4024            # killing abruptly processes holding reference to a shared memory
4025            # segment should not leak the given memory segment.
4026            p.terminate()
4027            p.wait()
4028
4029            deadline = time.monotonic() + 60
4030            t = 0.1
4031            while time.monotonic() < deadline:
4032                time.sleep(t)
4033                t = min(t*2, 5)
4034                try:
4035                    smm = shared_memory.SharedMemory(name, create=False)
4036                except FileNotFoundError:
4037                    break
4038            else:
4039                raise AssertionError("A SharedMemory segment was leaked after"
4040                                     " a process was abruptly terminated.")
4041
4042            if os.name == 'posix':
4043                # A warning was emitted by the subprocess' own
4044                # resource_tracker (on Windows, shared memory segments
4045                # are released automatically by the OS).
4046                err = p.stderr.read().decode()
4047                self.assertIn(
4048                    "resource_tracker: There appear to be 1 leaked "
4049                    "shared_memory objects to clean up at shutdown", err)
4050
4051#
4052#
4053#
4054
4055class _TestFinalize(BaseTestCase):
4056
4057    ALLOWED_TYPES = ('processes',)
4058
4059    def setUp(self):
4060        self.registry_backup = util._finalizer_registry.copy()
4061        util._finalizer_registry.clear()
4062
4063    def tearDown(self):
4064        self.assertFalse(util._finalizer_registry)
4065        util._finalizer_registry.update(self.registry_backup)
4066
4067    @classmethod
4068    def _test_finalize(cls, conn):
4069        class Foo(object):
4070            pass
4071
4072        a = Foo()
4073        util.Finalize(a, conn.send, args=('a',))
4074        del a           # triggers callback for a
4075
4076        b = Foo()
4077        close_b = util.Finalize(b, conn.send, args=('b',))
4078        close_b()       # triggers callback for b
4079        close_b()       # does nothing because callback has already been called
4080        del b           # does nothing because callback has already been called
4081
4082        c = Foo()
4083        util.Finalize(c, conn.send, args=('c',))
4084
4085        d10 = Foo()
4086        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4087
4088        d01 = Foo()
4089        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4090        d02 = Foo()
4091        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4092        d03 = Foo()
4093        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4094
4095        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4096
4097        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4098
4099        # call multiprocessing's cleanup function then exit process without
4100        # garbage collecting locals
4101        util._exit_function()
4102        conn.close()
4103        os._exit(0)
4104
4105    def test_finalize(self):
4106        conn, child_conn = self.Pipe()
4107
4108        p = self.Process(target=self._test_finalize, args=(child_conn,))
4109        p.daemon = True
4110        p.start()
4111        p.join()
4112
4113        result = [obj for obj in iter(conn.recv, 'STOP')]
4114        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4115
4116    def test_thread_safety(self):
4117        # bpo-24484: _run_finalizers() should be thread-safe
4118        def cb():
4119            pass
4120
4121        class Foo(object):
4122            def __init__(self):
4123                self.ref = self  # create reference cycle
4124                # insert finalizer at random key
4125                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4126
4127        finish = False
4128        exc = None
4129
4130        def run_finalizers():
4131            nonlocal exc
4132            while not finish:
4133                time.sleep(random.random() * 1e-1)
4134                try:
4135                    # A GC run will eventually happen during this,
4136                    # collecting stale Foo's and mutating the registry
4137                    util._run_finalizers()
4138                except Exception as e:
4139                    exc = e
4140
4141        def make_finalizers():
4142            nonlocal exc
4143            d = {}
4144            while not finish:
4145                try:
4146                    # Old Foo's get gradually replaced and later
4147                    # collected by the GC (because of the cyclic ref)
4148                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
4149                except Exception as e:
4150                    exc = e
4151                    d.clear()
4152
4153        old_interval = sys.getswitchinterval()
4154        old_threshold = gc.get_threshold()
4155        try:
4156            sys.setswitchinterval(1e-6)
4157            gc.set_threshold(5, 5, 5)
4158            threads = [threading.Thread(target=run_finalizers),
4159                       threading.Thread(target=make_finalizers)]
4160            with test.support.start_threads(threads):
4161                time.sleep(4.0)  # Wait a bit to trigger race condition
4162                finish = True
4163            if exc is not None:
4164                raise exc
4165        finally:
4166            sys.setswitchinterval(old_interval)
4167            gc.set_threshold(*old_threshold)
4168            gc.collect()  # Collect remaining Foo's
4169
4170
4171#
4172# Test that from ... import * works for each module
4173#
4174
4175class _TestImportStar(unittest.TestCase):
4176
4177    def get_module_names(self):
4178        import glob
4179        folder = os.path.dirname(multiprocessing.__file__)
4180        pattern = os.path.join(folder, '*.py')
4181        files = glob.glob(pattern)
4182        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4183        modules = ['multiprocessing.' + m for m in modules]
4184        modules.remove('multiprocessing.__init__')
4185        modules.append('multiprocessing')
4186        return modules
4187
4188    def test_import(self):
4189        modules = self.get_module_names()
4190        if sys.platform == 'win32':
4191            modules.remove('multiprocessing.popen_fork')
4192            modules.remove('multiprocessing.popen_forkserver')
4193            modules.remove('multiprocessing.popen_spawn_posix')
4194        else:
4195            modules.remove('multiprocessing.popen_spawn_win32')
4196            if not HAS_REDUCTION:
4197                modules.remove('multiprocessing.popen_forkserver')
4198
4199        if c_int is None:
4200            # This module requires _ctypes
4201            modules.remove('multiprocessing.sharedctypes')
4202
4203        for name in modules:
4204            __import__(name)
4205            mod = sys.modules[name]
4206            self.assertTrue(hasattr(mod, '__all__'), name)
4207
4208            for attr in mod.__all__:
4209                self.assertTrue(
4210                    hasattr(mod, attr),
4211                    '%r does not have attribute %r' % (mod, attr)
4212                    )
4213
4214#
4215# Quick test that logging works -- does not test logging output
4216#
4217
4218class _TestLogging(BaseTestCase):
4219
4220    ALLOWED_TYPES = ('processes',)
4221
4222    def test_enable_logging(self):
4223        logger = multiprocessing.get_logger()
4224        logger.setLevel(util.SUBWARNING)
4225        self.assertTrue(logger is not None)
4226        logger.debug('this will not be printed')
4227        logger.info('nor will this')
4228        logger.setLevel(LOG_LEVEL)
4229
4230    @classmethod
4231    def _test_level(cls, conn):
4232        logger = multiprocessing.get_logger()
4233        conn.send(logger.getEffectiveLevel())
4234
4235    def test_level(self):
4236        LEVEL1 = 32
4237        LEVEL2 = 37
4238
4239        logger = multiprocessing.get_logger()
4240        root_logger = logging.getLogger()
4241        root_level = root_logger.level
4242
4243        reader, writer = multiprocessing.Pipe(duplex=False)
4244
4245        logger.setLevel(LEVEL1)
4246        p = self.Process(target=self._test_level, args=(writer,))
4247        p.start()
4248        self.assertEqual(LEVEL1, reader.recv())
4249        p.join()
4250        p.close()
4251
4252        logger.setLevel(logging.NOTSET)
4253        root_logger.setLevel(LEVEL2)
4254        p = self.Process(target=self._test_level, args=(writer,))
4255        p.start()
4256        self.assertEqual(LEVEL2, reader.recv())
4257        p.join()
4258        p.close()
4259
4260        root_logger.setLevel(root_level)
4261        logger.setLevel(level=LOG_LEVEL)
4262
4263
4264# class _TestLoggingProcessName(BaseTestCase):
4265#
4266#     def handle(self, record):
4267#         assert record.processName == multiprocessing.current_process().name
4268#         self.__handled = True
4269#
4270#     def test_logging(self):
4271#         handler = logging.Handler()
4272#         handler.handle = self.handle
4273#         self.__handled = False
4274#         # Bypass getLogger() and side-effects
4275#         logger = logging.getLoggerClass()(
4276#                 'multiprocessing.test.TestLoggingProcessName')
4277#         logger.addHandler(handler)
4278#         logger.propagate = False
4279#
4280#         logger.warn('foo')
4281#         assert self.__handled
4282
4283#
4284# Check that Process.join() retries if os.waitpid() fails with EINTR
4285#
4286
4287class _TestPollEintr(BaseTestCase):
4288
4289    ALLOWED_TYPES = ('processes',)
4290
4291    @classmethod
4292    def _killer(cls, pid):
4293        time.sleep(0.1)
4294        os.kill(pid, signal.SIGUSR1)
4295
4296    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4297    def test_poll_eintr(self):
4298        got_signal = [False]
4299        def record(*args):
4300            got_signal[0] = True
4301        pid = os.getpid()
4302        oldhandler = signal.signal(signal.SIGUSR1, record)
4303        try:
4304            killer = self.Process(target=self._killer, args=(pid,))
4305            killer.start()
4306            try:
4307                p = self.Process(target=time.sleep, args=(2,))
4308                p.start()
4309                p.join()
4310            finally:
4311                killer.join()
4312            self.assertTrue(got_signal[0])
4313            self.assertEqual(p.exitcode, 0)
4314        finally:
4315            signal.signal(signal.SIGUSR1, oldhandler)
4316
4317#
4318# Test to verify handle verification, see issue 3321
4319#
4320
4321class TestInvalidHandle(unittest.TestCase):
4322
4323    @unittest.skipIf(WIN32, "skipped on Windows")
4324    def test_invalid_handles(self):
4325        conn = multiprocessing.connection.Connection(44977608)
4326        # check that poll() doesn't crash
4327        try:
4328            conn.poll()
4329        except (ValueError, OSError):
4330            pass
4331        finally:
4332            # Hack private attribute _handle to avoid printing an error
4333            # in conn.__del__
4334            conn._handle = None
4335        self.assertRaises((ValueError, OSError),
4336                          multiprocessing.connection.Connection, -1)
4337
4338
4339
4340class OtherTest(unittest.TestCase):
4341    # TODO: add more tests for deliver/answer challenge.
4342    def test_deliver_challenge_auth_failure(self):
4343        class _FakeConnection(object):
4344            def recv_bytes(self, size):
4345                return b'something bogus'
4346            def send_bytes(self, data):
4347                pass
4348        self.assertRaises(multiprocessing.AuthenticationError,
4349                          multiprocessing.connection.deliver_challenge,
4350                          _FakeConnection(), b'abc')
4351
4352    def test_answer_challenge_auth_failure(self):
4353        class _FakeConnection(object):
4354            def __init__(self):
4355                self.count = 0
4356            def recv_bytes(self, size):
4357                self.count += 1
4358                if self.count == 1:
4359                    return multiprocessing.connection.CHALLENGE
4360                elif self.count == 2:
4361                    return b'something bogus'
4362                return b''
4363            def send_bytes(self, data):
4364                pass
4365        self.assertRaises(multiprocessing.AuthenticationError,
4366                          multiprocessing.connection.answer_challenge,
4367                          _FakeConnection(), b'abc')
4368
4369#
4370# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4371#
4372
4373def initializer(ns):
4374    ns.test += 1
4375
4376class TestInitializers(unittest.TestCase):
4377    def setUp(self):
4378        self.mgr = multiprocessing.Manager()
4379        self.ns = self.mgr.Namespace()
4380        self.ns.test = 0
4381
4382    def tearDown(self):
4383        self.mgr.shutdown()
4384        self.mgr.join()
4385
4386    def test_manager_initializer(self):
4387        m = multiprocessing.managers.SyncManager()
4388        self.assertRaises(TypeError, m.start, 1)
4389        m.start(initializer, (self.ns,))
4390        self.assertEqual(self.ns.test, 1)
4391        m.shutdown()
4392        m.join()
4393
4394    def test_pool_initializer(self):
4395        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4396        p = multiprocessing.Pool(1, initializer, (self.ns,))
4397        p.close()
4398        p.join()
4399        self.assertEqual(self.ns.test, 1)
4400
4401#
4402# Issue 5155, 5313, 5331: Test process in processes
4403# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4404#
4405
4406def _this_sub_process(q):
4407    try:
4408        item = q.get(block=False)
4409    except pyqueue.Empty:
4410        pass
4411
4412def _test_process():
4413    queue = multiprocessing.Queue()
4414    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4415    subProc.daemon = True
4416    subProc.start()
4417    subProc.join()
4418
4419def _afunc(x):
4420    return x*x
4421
4422def pool_in_process():
4423    pool = multiprocessing.Pool(processes=4)
4424    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
4425    pool.close()
4426    pool.join()
4427
4428class _file_like(object):
4429    def __init__(self, delegate):
4430        self._delegate = delegate
4431        self._pid = None
4432
4433    @property
4434    def cache(self):
4435        pid = os.getpid()
4436        # There are no race conditions since fork keeps only the running thread
4437        if pid != self._pid:
4438            self._pid = pid
4439            self._cache = []
4440        return self._cache
4441
4442    def write(self, data):
4443        self.cache.append(data)
4444
4445    def flush(self):
4446        self._delegate.write(''.join(self.cache))
4447        self._cache = []
4448
4449class TestStdinBadfiledescriptor(unittest.TestCase):
4450
4451    def test_queue_in_process(self):
4452        proc = multiprocessing.Process(target=_test_process)
4453        proc.start()
4454        proc.join()
4455
4456    def test_pool_in_process(self):
4457        p = multiprocessing.Process(target=pool_in_process)
4458        p.start()
4459        p.join()
4460
4461    def test_flushing(self):
4462        sio = io.StringIO()
4463        flike = _file_like(sio)
4464        flike.write('foo')
4465        proc = multiprocessing.Process(target=lambda: flike.flush())
4466        flike.flush()
4467        assert sio.getvalue() == 'foo'
4468
4469
4470class TestWait(unittest.TestCase):
4471
4472    @classmethod
4473    def _child_test_wait(cls, w, slow):
4474        for i in range(10):
4475            if slow:
4476                time.sleep(random.random()*0.1)
4477            w.send((i, os.getpid()))
4478        w.close()
4479
4480    def test_wait(self, slow=False):
4481        from multiprocessing.connection import wait
4482        readers = []
4483        procs = []
4484        messages = []
4485
4486        for i in range(4):
4487            r, w = multiprocessing.Pipe(duplex=False)
4488            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
4489            p.daemon = True
4490            p.start()
4491            w.close()
4492            readers.append(r)
4493            procs.append(p)
4494            self.addCleanup(p.join)
4495
4496        while readers:
4497            for r in wait(readers):
4498                try:
4499                    msg = r.recv()
4500                except EOFError:
4501                    readers.remove(r)
4502                    r.close()
4503                else:
4504                    messages.append(msg)
4505
4506        messages.sort()
4507        expected = sorted((i, p.pid) for i in range(10) for p in procs)
4508        self.assertEqual(messages, expected)
4509
4510    @classmethod
4511    def _child_test_wait_socket(cls, address, slow):
4512        s = socket.socket()
4513        s.connect(address)
4514        for i in range(10):
4515            if slow:
4516                time.sleep(random.random()*0.1)
4517            s.sendall(('%s\n' % i).encode('ascii'))
4518        s.close()
4519
4520    def test_wait_socket(self, slow=False):
4521        from multiprocessing.connection import wait
4522        l = socket.create_server((test.support.HOST, 0))
4523        addr = l.getsockname()
4524        readers = []
4525        procs = []
4526        dic = {}
4527
4528        for i in range(4):
4529            p = multiprocessing.Process(target=self._child_test_wait_socket,
4530                                        args=(addr, slow))
4531            p.daemon = True
4532            p.start()
4533            procs.append(p)
4534            self.addCleanup(p.join)
4535
4536        for i in range(4):
4537            r, _ = l.accept()
4538            readers.append(r)
4539            dic[r] = []
4540        l.close()
4541
4542        while readers:
4543            for r in wait(readers):
4544                msg = r.recv(32)
4545                if not msg:
4546                    readers.remove(r)
4547                    r.close()
4548                else:
4549                    dic[r].append(msg)
4550
4551        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4552        for v in dic.values():
4553            self.assertEqual(b''.join(v), expected)
4554
4555    def test_wait_slow(self):
4556        self.test_wait(True)
4557
4558    def test_wait_socket_slow(self):
4559        self.test_wait_socket(True)
4560
4561    def test_wait_timeout(self):
4562        from multiprocessing.connection import wait
4563
4564        expected = 5
4565        a, b = multiprocessing.Pipe()
4566
4567        start = time.monotonic()
4568        res = wait([a, b], expected)
4569        delta = time.monotonic() - start
4570
4571        self.assertEqual(res, [])
4572        self.assertLess(delta, expected * 2)
4573        self.assertGreater(delta, expected * 0.5)
4574
4575        b.send(None)
4576
4577        start = time.monotonic()
4578        res = wait([a, b], 20)
4579        delta = time.monotonic() - start
4580
4581        self.assertEqual(res, [a])
4582        self.assertLess(delta, 0.4)
4583
4584    @classmethod
4585    def signal_and_sleep(cls, sem, period):
4586        sem.release()
4587        time.sleep(period)
4588
4589    def test_wait_integer(self):
4590        from multiprocessing.connection import wait
4591
4592        expected = 3
4593        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
4594        sem = multiprocessing.Semaphore(0)
4595        a, b = multiprocessing.Pipe()
4596        p = multiprocessing.Process(target=self.signal_and_sleep,
4597                                    args=(sem, expected))
4598
4599        p.start()
4600        self.assertIsInstance(p.sentinel, int)
4601        self.assertTrue(sem.acquire(timeout=20))
4602
4603        start = time.monotonic()
4604        res = wait([a, p.sentinel, b], expected + 20)
4605        delta = time.monotonic() - start
4606
4607        self.assertEqual(res, [p.sentinel])
4608        self.assertLess(delta, expected + 2)
4609        self.assertGreater(delta, expected - 2)
4610
4611        a.send(None)
4612
4613        start = time.monotonic()
4614        res = wait([a, p.sentinel, b], 20)
4615        delta = time.monotonic() - start
4616
4617        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
4618        self.assertLess(delta, 0.4)
4619
4620        b.send(None)
4621
4622        start = time.monotonic()
4623        res = wait([a, p.sentinel, b], 20)
4624        delta = time.monotonic() - start
4625
4626        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
4627        self.assertLess(delta, 0.4)
4628
4629        p.terminate()
4630        p.join()
4631
4632    def test_neg_timeout(self):
4633        from multiprocessing.connection import wait
4634        a, b = multiprocessing.Pipe()
4635        t = time.monotonic()
4636        res = wait([a], timeout=-1)
4637        t = time.monotonic() - t
4638        self.assertEqual(res, [])
4639        self.assertLess(t, 1)
4640        a.close()
4641        b.close()
4642
4643#
4644# Issue 14151: Test invalid family on invalid environment
4645#
4646
4647class TestInvalidFamily(unittest.TestCase):
4648
4649    @unittest.skipIf(WIN32, "skipped on Windows")
4650    def test_invalid_family(self):
4651        with self.assertRaises(ValueError):
4652            multiprocessing.connection.Listener(r'\\.\test')
4653
4654    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
4655    def test_invalid_family_win32(self):
4656        with self.assertRaises(ValueError):
4657            multiprocessing.connection.Listener('/var/test.pipe')
4658
4659#
4660# Issue 12098: check sys.flags of child matches that for parent
4661#
4662
4663class TestFlags(unittest.TestCase):
4664    @classmethod
4665    def run_in_grandchild(cls, conn):
4666        conn.send(tuple(sys.flags))
4667
4668    @classmethod
4669    def run_in_child(cls):
4670        import json
4671        r, w = multiprocessing.Pipe(duplex=False)
4672        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4673        p.start()
4674        grandchild_flags = r.recv()
4675        p.join()
4676        r.close()
4677        w.close()
4678        flags = (tuple(sys.flags), grandchild_flags)
4679        print(json.dumps(flags))
4680
4681    def test_flags(self):
4682        import json
4683        # start child process using unusual flags
4684        prog = ('from test._test_multiprocessing import TestFlags; ' +
4685                'TestFlags.run_in_child()')
4686        data = subprocess.check_output(
4687            [sys.executable, '-E', '-S', '-O', '-c', prog])
4688        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4689        self.assertEqual(child_flags, grandchild_flags)
4690
4691#
4692# Test interaction with socket timeouts - see Issue #6056
4693#
4694
4695class TestTimeouts(unittest.TestCase):
4696    @classmethod
4697    def _test_timeout(cls, child, address):
4698        time.sleep(1)
4699        child.send(123)
4700        child.close()
4701        conn = multiprocessing.connection.Client(address)
4702        conn.send(456)
4703        conn.close()
4704
4705    def test_timeout(self):
4706        old_timeout = socket.getdefaulttimeout()
4707        try:
4708            socket.setdefaulttimeout(0.1)
4709            parent, child = multiprocessing.Pipe(duplex=True)
4710            l = multiprocessing.connection.Listener(family='AF_INET')
4711            p = multiprocessing.Process(target=self._test_timeout,
4712                                        args=(child, l.address))
4713            p.start()
4714            child.close()
4715            self.assertEqual(parent.recv(), 123)
4716            parent.close()
4717            conn = l.accept()
4718            self.assertEqual(conn.recv(), 456)
4719            conn.close()
4720            l.close()
4721            join_process(p)
4722        finally:
4723            socket.setdefaulttimeout(old_timeout)
4724
4725#
4726# Test what happens with no "if __name__ == '__main__'"
4727#
4728
4729class TestNoForkBomb(unittest.TestCase):
4730    def test_noforkbomb(self):
4731        sm = multiprocessing.get_start_method()
4732        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
4733        if sm != 'fork':
4734            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
4735            self.assertEqual(out, b'')
4736            self.assertIn(b'RuntimeError', err)
4737        else:
4738            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
4739            self.assertEqual(out.rstrip(), b'123')
4740            self.assertEqual(err, b'')
4741
4742#
4743# Issue #17555: ForkAwareThreadLock
4744#
4745
4746class TestForkAwareThreadLock(unittest.TestCase):
4747    # We recursively start processes.  Issue #17555 meant that the
4748    # after fork registry would get duplicate entries for the same
4749    # lock.  The size of the registry at generation n was ~2**n.
4750
4751    @classmethod
4752    def child(cls, n, conn):
4753        if n > 1:
4754            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4755            p.start()
4756            conn.close()
4757            join_process(p)
4758        else:
4759            conn.send(len(util._afterfork_registry))
4760        conn.close()
4761
4762    def test_lock(self):
4763        r, w = multiprocessing.Pipe(False)
4764        l = util.ForkAwareThreadLock()
4765        old_size = len(util._afterfork_registry)
4766        p = multiprocessing.Process(target=self.child, args=(5, w))
4767        p.start()
4768        w.close()
4769        new_size = r.recv()
4770        join_process(p)
4771        self.assertLessEqual(new_size, old_size)
4772
4773#
4774# Check that non-forked child processes do not inherit unneeded fds/handles
4775#
4776
4777class TestCloseFds(unittest.TestCase):
4778
4779    def get_high_socket_fd(self):
4780        if WIN32:
4781            # The child process will not have any socket handles, so
4782            # calling socket.fromfd() should produce WSAENOTSOCK even
4783            # if there is a handle of the same number.
4784            return socket.socket().detach()
4785        else:
4786            # We want to produce a socket with an fd high enough that a
4787            # freshly created child process will not have any fds as high.
4788            fd = socket.socket().detach()
4789            to_close = []
4790            while fd < 50:
4791                to_close.append(fd)
4792                fd = os.dup(fd)
4793            for x in to_close:
4794                os.close(x)
4795            return fd
4796
4797    def close(self, fd):
4798        if WIN32:
4799            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
4800        else:
4801            os.close(fd)
4802
4803    @classmethod
4804    def _test_closefds(cls, conn, fd):
4805        try:
4806            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4807        except Exception as e:
4808            conn.send(e)
4809        else:
4810            s.close()
4811            conn.send(None)
4812
4813    def test_closefd(self):
4814        if not HAS_REDUCTION:
4815            raise unittest.SkipTest('requires fd pickling')
4816
4817        reader, writer = multiprocessing.Pipe()
4818        fd = self.get_high_socket_fd()
4819        try:
4820            p = multiprocessing.Process(target=self._test_closefds,
4821                                        args=(writer, fd))
4822            p.start()
4823            writer.close()
4824            e = reader.recv()
4825            join_process(p)
4826        finally:
4827            self.close(fd)
4828            writer.close()
4829            reader.close()
4830
4831        if multiprocessing.get_start_method() == 'fork':
4832            self.assertIs(e, None)
4833        else:
4834            WSAENOTSOCK = 10038
4835            self.assertIsInstance(e, OSError)
4836            self.assertTrue(e.errno == errno.EBADF or
4837                            e.winerror == WSAENOTSOCK, e)
4838
4839#
4840# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4841#
4842
4843class TestIgnoreEINTR(unittest.TestCase):
4844
4845    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4846    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4847
4848    @classmethod
4849    def _test_ignore(cls, conn):
4850        def handler(signum, frame):
4851            pass
4852        signal.signal(signal.SIGUSR1, handler)
4853        conn.send('ready')
4854        x = conn.recv()
4855        conn.send(x)
4856        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
4857
4858    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4859    def test_ignore(self):
4860        conn, child_conn = multiprocessing.Pipe()
4861        try:
4862            p = multiprocessing.Process(target=self._test_ignore,
4863                                        args=(child_conn,))
4864            p.daemon = True
4865            p.start()
4866            child_conn.close()
4867            self.assertEqual(conn.recv(), 'ready')
4868            time.sleep(0.1)
4869            os.kill(p.pid, signal.SIGUSR1)
4870            time.sleep(0.1)
4871            conn.send(1234)
4872            self.assertEqual(conn.recv(), 1234)
4873            time.sleep(0.1)
4874            os.kill(p.pid, signal.SIGUSR1)
4875            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
4876            time.sleep(0.1)
4877            p.join()
4878        finally:
4879            conn.close()
4880
4881    @classmethod
4882    def _test_ignore_listener(cls, conn):
4883        def handler(signum, frame):
4884            pass
4885        signal.signal(signal.SIGUSR1, handler)
4886        with multiprocessing.connection.Listener() as l:
4887            conn.send(l.address)
4888            a = l.accept()
4889            a.send('welcome')
4890
4891    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4892    def test_ignore_listener(self):
4893        conn, child_conn = multiprocessing.Pipe()
4894        try:
4895            p = multiprocessing.Process(target=self._test_ignore_listener,
4896                                        args=(child_conn,))
4897            p.daemon = True
4898            p.start()
4899            child_conn.close()
4900            address = conn.recv()
4901            time.sleep(0.1)
4902            os.kill(p.pid, signal.SIGUSR1)
4903            time.sleep(0.1)
4904            client = multiprocessing.connection.Client(address)
4905            self.assertEqual(client.recv(), 'welcome')
4906            p.join()
4907        finally:
4908            conn.close()
4909
4910class TestStartMethod(unittest.TestCase):
4911    @classmethod
4912    def _check_context(cls, conn):
4913        conn.send(multiprocessing.get_start_method())
4914
4915    def check_context(self, ctx):
4916        r, w = ctx.Pipe(duplex=False)
4917        p = ctx.Process(target=self._check_context, args=(w,))
4918        p.start()
4919        w.close()
4920        child_method = r.recv()
4921        r.close()
4922        p.join()
4923        self.assertEqual(child_method, ctx.get_start_method())
4924
4925    def test_context(self):
4926        for method in ('fork', 'spawn', 'forkserver'):
4927            try:
4928                ctx = multiprocessing.get_context(method)
4929            except ValueError:
4930                continue
4931            self.assertEqual(ctx.get_start_method(), method)
4932            self.assertIs(ctx.get_context(), ctx)
4933            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4934            self.assertRaises(ValueError, ctx.set_start_method, None)
4935            self.check_context(ctx)
4936
4937    def test_set_get(self):
4938        multiprocessing.set_forkserver_preload(PRELOAD)
4939        count = 0
4940        old_method = multiprocessing.get_start_method()
4941        try:
4942            for method in ('fork', 'spawn', 'forkserver'):
4943                try:
4944                    multiprocessing.set_start_method(method, force=True)
4945                except ValueError:
4946                    continue
4947                self.assertEqual(multiprocessing.get_start_method(), method)
4948                ctx = multiprocessing.get_context()
4949                self.assertEqual(ctx.get_start_method(), method)
4950                self.assertTrue(type(ctx).__name__.lower().startswith(method))
4951                self.assertTrue(
4952                    ctx.Process.__name__.lower().startswith(method))
4953                self.check_context(multiprocessing)
4954                count += 1
4955        finally:
4956            multiprocessing.set_start_method(old_method, force=True)
4957        self.assertGreaterEqual(count, 1)
4958
4959    def test_get_all(self):
4960        methods = multiprocessing.get_all_start_methods()
4961        if sys.platform == 'win32':
4962            self.assertEqual(methods, ['spawn'])
4963        else:
4964            self.assertTrue(methods == ['fork', 'spawn'] or
4965                            methods == ['fork', 'spawn', 'forkserver'])
4966
4967    def test_preload_resources(self):
4968        if multiprocessing.get_start_method() != 'forkserver':
4969            self.skipTest("test only relevant for 'forkserver' method")
4970        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
4971        rc, out, err = test.support.script_helper.assert_python_ok(name)
4972        out = out.decode()
4973        err = err.decode()
4974        if out.rstrip() != 'ok' or err != '':
4975            print(out)
4976            print(err)
4977            self.fail("failed spawning forkserver or grandchild")
4978
4979
4980@unittest.skipIf(sys.platform == "win32",
4981                 "test semantics don't make sense on Windows")
4982class TestResourceTracker(unittest.TestCase):
4983
4984    def test_resource_tracker(self):
4985        #
4986        # Check that killing process does not leak named semaphores
4987        #
4988        cmd = '''if 1:
4989            import time, os, tempfile
4990            import multiprocessing as mp
4991            from multiprocessing import resource_tracker
4992            from multiprocessing.shared_memory import SharedMemory
4993
4994            mp.set_start_method("spawn")
4995            rand = tempfile._RandomNameSequence()
4996
4997
4998            def create_and_register_resource(rtype):
4999                if rtype == "semaphore":
5000                    lock = mp.Lock()
5001                    return lock, lock._semlock.name
5002                elif rtype == "shared_memory":
5003                    sm = SharedMemory(create=True, size=10)
5004                    return sm, sm._name
5005                else:
5006                    raise ValueError(
5007                        "Resource type {{}} not understood".format(rtype))
5008
5009
5010            resource1, rname1 = create_and_register_resource("{rtype}")
5011            resource2, rname2 = create_and_register_resource("{rtype}")
5012
5013            os.write({w}, rname1.encode("ascii") + b"\\n")
5014            os.write({w}, rname2.encode("ascii") + b"\\n")
5015
5016            time.sleep(10)
5017        '''
5018        for rtype in resource_tracker._CLEANUP_FUNCS:
5019            with self.subTest(rtype=rtype):
5020                if rtype == "noop":
5021                    # Artefact resource type used by the resource_tracker
5022                    continue
5023                r, w = os.pipe()
5024                p = subprocess.Popen([sys.executable,
5025                                     '-E', '-c', cmd.format(w=w, rtype=rtype)],
5026                                     pass_fds=[w],
5027                                     stderr=subprocess.PIPE)
5028                os.close(w)
5029                with open(r, 'rb', closefd=True) as f:
5030                    name1 = f.readline().rstrip().decode('ascii')
5031                    name2 = f.readline().rstrip().decode('ascii')
5032                _resource_unlink(name1, rtype)
5033                p.terminate()
5034                p.wait()
5035
5036                deadline = time.monotonic() + 60
5037                while time.monotonic() < deadline:
5038                    time.sleep(.5)
5039                    try:
5040                        _resource_unlink(name2, rtype)
5041                    except OSError as e:
5042                        # docs say it should be ENOENT, but OSX seems to give
5043                        # EINVAL
5044                        self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5045                        break
5046                else:
5047                    raise AssertionError(
5048                        f"A {rtype} resource was leaked after a process was "
5049                        f"abruptly terminated.")
5050                err = p.stderr.read().decode('utf-8')
5051                p.stderr.close()
5052                expected = ('resource_tracker: There appear to be 2 leaked {} '
5053                            'objects'.format(
5054                            rtype))
5055                self.assertRegex(err, expected)
5056                self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
5057
5058    def check_resource_tracker_death(self, signum, should_die):
5059        # bpo-31310: if the semaphore tracker process has died, it should
5060        # be restarted implicitly.
5061        from multiprocessing.resource_tracker import _resource_tracker
5062        pid = _resource_tracker._pid
5063        if pid is not None:
5064            os.kill(pid, signal.SIGKILL)
5065            os.waitpid(pid, 0)
5066        with warnings.catch_warnings():
5067            warnings.simplefilter("ignore")
5068            _resource_tracker.ensure_running()
5069        pid = _resource_tracker._pid
5070
5071        os.kill(pid, signum)
5072        time.sleep(1.0)  # give it time to die
5073
5074        ctx = multiprocessing.get_context("spawn")
5075        with warnings.catch_warnings(record=True) as all_warn:
5076            warnings.simplefilter("always")
5077            sem = ctx.Semaphore()
5078            sem.acquire()
5079            sem.release()
5080            wr = weakref.ref(sem)
5081            # ensure `sem` gets collected, which triggers communication with
5082            # the semaphore tracker
5083            del sem
5084            gc.collect()
5085            self.assertIsNone(wr())
5086            if should_die:
5087                self.assertEqual(len(all_warn), 1)
5088                the_warn = all_warn[0]
5089                self.assertTrue(issubclass(the_warn.category, UserWarning))
5090                self.assertTrue("resource_tracker: process died"
5091                                in str(the_warn.message))
5092            else:
5093                self.assertEqual(len(all_warn), 0)
5094
5095    def test_resource_tracker_sigint(self):
5096        # Catchable signal (ignored by semaphore tracker)
5097        self.check_resource_tracker_death(signal.SIGINT, False)
5098
5099    def test_resource_tracker_sigterm(self):
5100        # Catchable signal (ignored by semaphore tracker)
5101        self.check_resource_tracker_death(signal.SIGTERM, False)
5102
5103    def test_resource_tracker_sigkill(self):
5104        # Uncatchable signal.
5105        self.check_resource_tracker_death(signal.SIGKILL, True)
5106
5107    @staticmethod
5108    def _is_resource_tracker_reused(conn, pid):
5109        from multiprocessing.resource_tracker import _resource_tracker
5110        _resource_tracker.ensure_running()
5111        # The pid should be None in the child process, expect for the fork
5112        # context. It should not be a new value.
5113        reused = _resource_tracker._pid in (None, pid)
5114        reused &= _resource_tracker._check_alive()
5115        conn.send(reused)
5116
5117    def test_resource_tracker_reused(self):
5118        from multiprocessing.resource_tracker import _resource_tracker
5119        _resource_tracker.ensure_running()
5120        pid = _resource_tracker._pid
5121
5122        r, w = multiprocessing.Pipe(duplex=False)
5123        p = multiprocessing.Process(target=self._is_resource_tracker_reused,
5124                                    args=(w, pid))
5125        p.start()
5126        is_resource_tracker_reused = r.recv()
5127
5128        # Clean up
5129        p.join()
5130        w.close()
5131        r.close()
5132
5133        self.assertTrue(is_resource_tracker_reused)
5134
5135
5136class TestSimpleQueue(unittest.TestCase):
5137
5138    @classmethod
5139    def _test_empty(cls, queue, child_can_start, parent_can_continue):
5140        child_can_start.wait()
5141        # issue 30301, could fail under spawn and forkserver
5142        try:
5143            queue.put(queue.empty())
5144            queue.put(queue.empty())
5145        finally:
5146            parent_can_continue.set()
5147
5148    def test_empty(self):
5149        queue = multiprocessing.SimpleQueue()
5150        child_can_start = multiprocessing.Event()
5151        parent_can_continue = multiprocessing.Event()
5152
5153        proc = multiprocessing.Process(
5154            target=self._test_empty,
5155            args=(queue, child_can_start, parent_can_continue)
5156        )
5157        proc.daemon = True
5158        proc.start()
5159
5160        self.assertTrue(queue.empty())
5161
5162        child_can_start.set()
5163        parent_can_continue.wait()
5164
5165        self.assertFalse(queue.empty())
5166        self.assertEqual(queue.get(), True)
5167        self.assertEqual(queue.get(), False)
5168        self.assertTrue(queue.empty())
5169
5170        proc.join()
5171
5172
5173class TestPoolNotLeakOnFailure(unittest.TestCase):
5174
5175    def test_release_unused_processes(self):
5176        # Issue #19675: During pool creation, if we can't create a process,
5177        # don't leak already created ones.
5178        will_fail_in = 3
5179        forked_processes = []
5180
5181        class FailingForkProcess:
5182            def __init__(self, **kwargs):
5183                self.name = 'Fake Process'
5184                self.exitcode = None
5185                self.state = None
5186                forked_processes.append(self)
5187
5188            def start(self):
5189                nonlocal will_fail_in
5190                if will_fail_in <= 0:
5191                    raise OSError("Manually induced OSError")
5192                will_fail_in -= 1
5193                self.state = 'started'
5194
5195            def terminate(self):
5196                self.state = 'stopping'
5197
5198            def join(self):
5199                if self.state == 'stopping':
5200                    self.state = 'stopped'
5201
5202            def is_alive(self):
5203                return self.state == 'started' or self.state == 'stopping'
5204
5205        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5206            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5207                Process=FailingForkProcess))
5208            p.close()
5209            p.join()
5210        self.assertFalse(
5211            any(process.is_alive() for process in forked_processes))
5212
5213
5214class TestSyncManagerTypes(unittest.TestCase):
5215    """Test all the types which can be shared between a parent and a
5216    child process by using a manager which acts as an intermediary
5217    between them.
5218
5219    In the following unit-tests the base type is created in the parent
5220    process, the @classmethod represents the worker process and the
5221    shared object is readable and editable between the two.
5222
5223    # The child.
5224    @classmethod
5225    def _test_list(cls, obj):
5226        assert obj[0] == 5
5227        assert obj.append(6)
5228
5229    # The parent.
5230    def test_list(self):
5231        o = self.manager.list()
5232        o.append(5)
5233        self.run_worker(self._test_list, o)
5234        assert o[1] == 6
5235    """
5236    manager_class = multiprocessing.managers.SyncManager
5237
5238    def setUp(self):
5239        self.manager = self.manager_class()
5240        self.manager.start()
5241        self.proc = None
5242
5243    def tearDown(self):
5244        if self.proc is not None and self.proc.is_alive():
5245            self.proc.terminate()
5246            self.proc.join()
5247        self.manager.shutdown()
5248        self.manager = None
5249        self.proc = None
5250
5251    @classmethod
5252    def setUpClass(cls):
5253        support.reap_children()
5254
5255    tearDownClass = setUpClass
5256
5257    def wait_proc_exit(self):
5258        # Only the manager process should be returned by active_children()
5259        # but this can take a bit on slow machines, so wait a few seconds
5260        # if there are other children too (see #17395).
5261        join_process(self.proc)
5262        start_time = time.monotonic()
5263        t = 0.01
5264        while len(multiprocessing.active_children()) > 1:
5265            time.sleep(t)
5266            t *= 2
5267            dt = time.monotonic() - start_time
5268            if dt >= 5.0:
5269                test.support.environment_altered = True
5270                print("Warning -- multiprocessing.Manager still has %s active "
5271                      "children after %s seconds"
5272                      % (multiprocessing.active_children(), dt),
5273                      file=sys.stderr)
5274                break
5275
5276    def run_worker(self, worker, obj):
5277        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5278        self.proc.daemon = True
5279        self.proc.start()
5280        self.wait_proc_exit()
5281        self.assertEqual(self.proc.exitcode, 0)
5282
5283    @classmethod
5284    def _test_event(cls, obj):
5285        assert obj.is_set()
5286        obj.wait()
5287        obj.clear()
5288        obj.wait(0.001)
5289
5290    def test_event(self):
5291        o = self.manager.Event()
5292        o.set()
5293        self.run_worker(self._test_event, o)
5294        assert not o.is_set()
5295        o.wait(0.001)
5296
5297    @classmethod
5298    def _test_lock(cls, obj):
5299        obj.acquire()
5300
5301    def test_lock(self, lname="Lock"):
5302        o = getattr(self.manager, lname)()
5303        self.run_worker(self._test_lock, o)
5304        o.release()
5305        self.assertRaises(RuntimeError, o.release)  # already released
5306
5307    @classmethod
5308    def _test_rlock(cls, obj):
5309        obj.acquire()
5310        obj.release()
5311
5312    def test_rlock(self, lname="Lock"):
5313        o = getattr(self.manager, lname)()
5314        self.run_worker(self._test_rlock, o)
5315
5316    @classmethod
5317    def _test_semaphore(cls, obj):
5318        obj.acquire()
5319
5320    def test_semaphore(self, sname="Semaphore"):
5321        o = getattr(self.manager, sname)()
5322        self.run_worker(self._test_semaphore, o)
5323        o.release()
5324
5325    def test_bounded_semaphore(self):
5326        self.test_semaphore(sname="BoundedSemaphore")
5327
5328    @classmethod
5329    def _test_condition(cls, obj):
5330        obj.acquire()
5331        obj.release()
5332
5333    def test_condition(self):
5334        o = self.manager.Condition()
5335        self.run_worker(self._test_condition, o)
5336
5337    @classmethod
5338    def _test_barrier(cls, obj):
5339        assert obj.parties == 5
5340        obj.reset()
5341
5342    def test_barrier(self):
5343        o = self.manager.Barrier(5)
5344        self.run_worker(self._test_barrier, o)
5345
5346    @classmethod
5347    def _test_pool(cls, obj):
5348        # TODO: fix https://bugs.python.org/issue35919
5349        with obj:
5350            pass
5351
5352    def test_pool(self):
5353        o = self.manager.Pool(processes=4)
5354        self.run_worker(self._test_pool, o)
5355
5356    @classmethod
5357    def _test_queue(cls, obj):
5358        assert obj.qsize() == 2
5359        assert obj.full()
5360        assert not obj.empty()
5361        assert obj.get() == 5
5362        assert not obj.empty()
5363        assert obj.get() == 6
5364        assert obj.empty()
5365
5366    def test_queue(self, qname="Queue"):
5367        o = getattr(self.manager, qname)(2)
5368        o.put(5)
5369        o.put(6)
5370        self.run_worker(self._test_queue, o)
5371        assert o.empty()
5372        assert not o.full()
5373
5374    def test_joinable_queue(self):
5375        self.test_queue("JoinableQueue")
5376
5377    @classmethod
5378    def _test_list(cls, obj):
5379        assert obj[0] == 5
5380        assert obj.count(5) == 1
5381        assert obj.index(5) == 0
5382        obj.sort()
5383        obj.reverse()
5384        for x in obj:
5385            pass
5386        assert len(obj) == 1
5387        assert obj.pop(0) == 5
5388
5389    def test_list(self):
5390        o = self.manager.list()
5391        o.append(5)
5392        self.run_worker(self._test_list, o)
5393        assert not o
5394        self.assertEqual(len(o), 0)
5395
5396    @classmethod
5397    def _test_dict(cls, obj):
5398        assert len(obj) == 1
5399        assert obj['foo'] == 5
5400        assert obj.get('foo') == 5
5401        assert list(obj.items()) == [('foo', 5)]
5402        assert list(obj.keys()) == ['foo']
5403        assert list(obj.values()) == [5]
5404        assert obj.copy() == {'foo': 5}
5405        assert obj.popitem() == ('foo', 5)
5406
5407    def test_dict(self):
5408        o = self.manager.dict()
5409        o['foo'] = 5
5410        self.run_worker(self._test_dict, o)
5411        assert not o
5412        self.assertEqual(len(o), 0)
5413
5414    @classmethod
5415    def _test_value(cls, obj):
5416        assert obj.value == 1
5417        assert obj.get() == 1
5418        obj.set(2)
5419
5420    def test_value(self):
5421        o = self.manager.Value('i', 1)
5422        self.run_worker(self._test_value, o)
5423        self.assertEqual(o.value, 2)
5424        self.assertEqual(o.get(), 2)
5425
5426    @classmethod
5427    def _test_array(cls, obj):
5428        assert obj[0] == 0
5429        assert obj[1] == 1
5430        assert len(obj) == 2
5431        assert list(obj) == [0, 1]
5432
5433    def test_array(self):
5434        o = self.manager.Array('i', [0, 1])
5435        self.run_worker(self._test_array, o)
5436
5437    @classmethod
5438    def _test_namespace(cls, obj):
5439        assert obj.x == 0
5440        assert obj.y == 1
5441
5442    def test_namespace(self):
5443        o = self.manager.Namespace()
5444        o.x = 0
5445        o.y = 1
5446        self.run_worker(self._test_namespace, o)
5447
5448
5449class MiscTestCase(unittest.TestCase):
5450    def test__all__(self):
5451        # Just make sure names in blacklist are excluded
5452        support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5453                             blacklist=['SUBDEBUG', 'SUBWARNING'])
5454#
5455# Mixins
5456#
5457
5458class BaseMixin(object):
5459    @classmethod
5460    def setUpClass(cls):
5461        cls.dangling = (multiprocessing.process._dangling.copy(),
5462                        threading._dangling.copy())
5463
5464    @classmethod
5465    def tearDownClass(cls):
5466        # bpo-26762: Some multiprocessing objects like Pool create reference
5467        # cycles. Trigger a garbage collection to break these cycles.
5468        test.support.gc_collect()
5469
5470        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5471        if processes:
5472            test.support.environment_altered = True
5473            print('Warning -- Dangling processes: %s' % processes,
5474                  file=sys.stderr)
5475        processes = None
5476
5477        threads = set(threading._dangling) - set(cls.dangling[1])
5478        if threads:
5479            test.support.environment_altered = True
5480            print('Warning -- Dangling threads: %s' % threads,
5481                  file=sys.stderr)
5482        threads = None
5483
5484
5485class ProcessesMixin(BaseMixin):
5486    TYPE = 'processes'
5487    Process = multiprocessing.Process
5488    connection = multiprocessing.connection
5489    current_process = staticmethod(multiprocessing.current_process)
5490    parent_process = staticmethod(multiprocessing.parent_process)
5491    active_children = staticmethod(multiprocessing.active_children)
5492    Pool = staticmethod(multiprocessing.Pool)
5493    Pipe = staticmethod(multiprocessing.Pipe)
5494    Queue = staticmethod(multiprocessing.Queue)
5495    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5496    Lock = staticmethod(multiprocessing.Lock)
5497    RLock = staticmethod(multiprocessing.RLock)
5498    Semaphore = staticmethod(multiprocessing.Semaphore)
5499    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5500    Condition = staticmethod(multiprocessing.Condition)
5501    Event = staticmethod(multiprocessing.Event)
5502    Barrier = staticmethod(multiprocessing.Barrier)
5503    Value = staticmethod(multiprocessing.Value)
5504    Array = staticmethod(multiprocessing.Array)
5505    RawValue = staticmethod(multiprocessing.RawValue)
5506    RawArray = staticmethod(multiprocessing.RawArray)
5507
5508
5509class ManagerMixin(BaseMixin):
5510    TYPE = 'manager'
5511    Process = multiprocessing.Process
5512    Queue = property(operator.attrgetter('manager.Queue'))
5513    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5514    Lock = property(operator.attrgetter('manager.Lock'))
5515    RLock = property(operator.attrgetter('manager.RLock'))
5516    Semaphore = property(operator.attrgetter('manager.Semaphore'))
5517    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5518    Condition = property(operator.attrgetter('manager.Condition'))
5519    Event = property(operator.attrgetter('manager.Event'))
5520    Barrier = property(operator.attrgetter('manager.Barrier'))
5521    Value = property(operator.attrgetter('manager.Value'))
5522    Array = property(operator.attrgetter('manager.Array'))
5523    list = property(operator.attrgetter('manager.list'))
5524    dict = property(operator.attrgetter('manager.dict'))
5525    Namespace = property(operator.attrgetter('manager.Namespace'))
5526
5527    @classmethod
5528    def Pool(cls, *args, **kwds):
5529        return cls.manager.Pool(*args, **kwds)
5530
5531    @classmethod
5532    def setUpClass(cls):
5533        super().setUpClass()
5534        cls.manager = multiprocessing.Manager()
5535
5536    @classmethod
5537    def tearDownClass(cls):
5538        # only the manager process should be returned by active_children()
5539        # but this can take a bit on slow machines, so wait a few seconds
5540        # if there are other children too (see #17395)
5541        start_time = time.monotonic()
5542        t = 0.01
5543        while len(multiprocessing.active_children()) > 1:
5544            time.sleep(t)
5545            t *= 2
5546            dt = time.monotonic() - start_time
5547            if dt >= 5.0:
5548                test.support.environment_altered = True
5549                print("Warning -- multiprocessing.Manager still has %s active "
5550                      "children after %s seconds"
5551                      % (multiprocessing.active_children(), dt),
5552                      file=sys.stderr)
5553                break
5554
5555        gc.collect()                       # do garbage collection
5556        if cls.manager._number_of_objects() != 0:
5557            # This is not really an error since some tests do not
5558            # ensure that all processes which hold a reference to a
5559            # managed object have been joined.
5560            test.support.environment_altered = True
5561            print('Warning -- Shared objects which still exist at manager '
5562                  'shutdown:')
5563            print(cls.manager._debug_info())
5564        cls.manager.shutdown()
5565        cls.manager.join()
5566        cls.manager = None
5567
5568        super().tearDownClass()
5569
5570
5571class ThreadsMixin(BaseMixin):
5572    TYPE = 'threads'
5573    Process = multiprocessing.dummy.Process
5574    connection = multiprocessing.dummy.connection
5575    current_process = staticmethod(multiprocessing.dummy.current_process)
5576    active_children = staticmethod(multiprocessing.dummy.active_children)
5577    Pool = staticmethod(multiprocessing.dummy.Pool)
5578    Pipe = staticmethod(multiprocessing.dummy.Pipe)
5579    Queue = staticmethod(multiprocessing.dummy.Queue)
5580    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5581    Lock = staticmethod(multiprocessing.dummy.Lock)
5582    RLock = staticmethod(multiprocessing.dummy.RLock)
5583    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5584    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5585    Condition = staticmethod(multiprocessing.dummy.Condition)
5586    Event = staticmethod(multiprocessing.dummy.Event)
5587    Barrier = staticmethod(multiprocessing.dummy.Barrier)
5588    Value = staticmethod(multiprocessing.dummy.Value)
5589    Array = staticmethod(multiprocessing.dummy.Array)
5590
5591#
5592# Functions used to create test cases from the base ones in this module
5593#
5594
5595def install_tests_in_module_dict(remote_globs, start_method):
5596    __module__ = remote_globs['__name__']
5597    local_globs = globals()
5598    ALL_TYPES = {'processes', 'threads', 'manager'}
5599
5600    for name, base in local_globs.items():
5601        if not isinstance(base, type):
5602            continue
5603        if issubclass(base, BaseTestCase):
5604            if base is BaseTestCase:
5605                continue
5606            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5607            for type_ in base.ALLOWED_TYPES:
5608                newname = 'With' + type_.capitalize() + name[1:]
5609                Mixin = local_globs[type_.capitalize() + 'Mixin']
5610                class Temp(base, Mixin, unittest.TestCase):
5611                    pass
5612                Temp.__name__ = Temp.__qualname__ = newname
5613                Temp.__module__ = __module__
5614                remote_globs[newname] = Temp
5615        elif issubclass(base, unittest.TestCase):
5616            class Temp(base, object):
5617                pass
5618            Temp.__name__ = Temp.__qualname__ = name
5619            Temp.__module__ = __module__
5620            remote_globs[name] = Temp
5621
5622    dangling = [None, None]
5623    old_start_method = [None]
5624
5625    def setUpModule():
5626        multiprocessing.set_forkserver_preload(PRELOAD)
5627        multiprocessing.process._cleanup()
5628        dangling[0] = multiprocessing.process._dangling.copy()
5629        dangling[1] = threading._dangling.copy()
5630        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
5631        try:
5632            multiprocessing.set_start_method(start_method, force=True)
5633        except ValueError:
5634            raise unittest.SkipTest(start_method +
5635                                    ' start method not supported')
5636
5637        if sys.platform.startswith("linux"):
5638            try:
5639                lock = multiprocessing.RLock()
5640            except OSError:
5641                raise unittest.SkipTest("OSError raises on RLock creation, "
5642                                        "see issue 3111!")
5643        check_enough_semaphores()
5644        util.get_temp_dir()     # creates temp directory
5645        multiprocessing.get_logger().setLevel(LOG_LEVEL)
5646
5647    def tearDownModule():
5648        need_sleep = False
5649
5650        # bpo-26762: Some multiprocessing objects like Pool create reference
5651        # cycles. Trigger a garbage collection to break these cycles.
5652        test.support.gc_collect()
5653
5654        multiprocessing.set_start_method(old_start_method[0], force=True)
5655        # pause a bit so we don't get warning about dangling threads/processes
5656        processes = set(multiprocessing.process._dangling) - set(dangling[0])
5657        if processes:
5658            need_sleep = True
5659            test.support.environment_altered = True
5660            print('Warning -- Dangling processes: %s' % processes,
5661                  file=sys.stderr)
5662        processes = None
5663
5664        threads = set(threading._dangling) - set(dangling[1])
5665        if threads:
5666            need_sleep = True
5667            test.support.environment_altered = True
5668            print('Warning -- Dangling threads: %s' % threads,
5669                  file=sys.stderr)
5670        threads = None
5671
5672        # Sleep 500 ms to give time to child processes to complete.
5673        if need_sleep:
5674            time.sleep(0.5)
5675
5676        multiprocessing.util._cleanup_tests()
5677
5678    remote_globs['setUpModule'] = setUpModule
5679    remote_globs['tearDownModule'] = tearDownModule
5680