• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import unittest.mock
7import queue as pyqueue
8import time
9import io
10import itertools
11import sys
12import os
13import gc
14import errno
15import signal
16import array
17import socket
18import random
19import logging
20import subprocess
21import struct
22import operator
23import pickle
24import weakref
25import warnings
26import test.support
27import test.support.script_helper
28from test import support
29from test.support import hashlib_helper
30from test.support import import_helper
31from test.support import os_helper
32from test.support import socket_helper
33from test.support import threading_helper
34from test.support import warnings_helper
35
36
37# Skip tests if _multiprocessing wasn't built.
38_multiprocessing = import_helper.import_module('_multiprocessing')
39# Skip tests if sem_open implementation is broken.
40support.skip_if_broken_multiprocessing_synchronize()
41import threading
42
43import multiprocessing.connection
44import multiprocessing.dummy
45import multiprocessing.heap
46import multiprocessing.managers
47import multiprocessing.pool
48import multiprocessing.queues
49
50from multiprocessing import util
51
52try:
53    from multiprocessing import reduction
54    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
55except ImportError:
56    HAS_REDUCTION = False
57
58try:
59    from multiprocessing.sharedctypes import Value, copy
60    HAS_SHAREDCTYPES = True
61except ImportError:
62    HAS_SHAREDCTYPES = False
63
64try:
65    from multiprocessing import shared_memory
66    HAS_SHMEM = True
67except ImportError:
68    HAS_SHMEM = False
69
70try:
71    import msvcrt
72except ImportError:
73    msvcrt = None
74
75
76if support.check_sanitizer(address=True):
77    # bpo-45200: Skip multiprocessing tests if Python is built with ASAN to
78    # work around a libasan race condition: dead lock in pthread_create().
79    raise unittest.SkipTest("libasan has a pthread_create() dead lock")
80
81
82def latin(s):
83    return s.encode('latin')
84
85
86def close_queue(queue):
87    if isinstance(queue, multiprocessing.queues.Queue):
88        queue.close()
89        queue.join_thread()
90
91
92def join_process(process):
93    # Since multiprocessing.Process has the same API than threading.Thread
94    # (join() and is_alive(), the support function can be reused
95    threading_helper.join_thread(process)
96
97
98if os.name == "posix":
99    from multiprocessing import resource_tracker
100
101    def _resource_unlink(name, rtype):
102        resource_tracker._CLEANUP_FUNCS[rtype](name)
103
104
105#
106# Constants
107#
108
109LOG_LEVEL = util.SUBWARNING
110#LOG_LEVEL = logging.DEBUG
111
112DELTA = 0.1
113CHECK_TIMINGS = False     # making true makes tests take a lot longer
114                          # and can sometimes cause some non-serious
115                          # failures because some calls block a bit
116                          # longer than expected
117if CHECK_TIMINGS:
118    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
119else:
120    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
121
122HAVE_GETVALUE = not getattr(_multiprocessing,
123                            'HAVE_BROKEN_SEM_GETVALUE', False)
124
125WIN32 = (sys.platform == "win32")
126
127from multiprocessing.connection import wait
128
129def wait_for_handle(handle, timeout):
130    if timeout is not None and timeout < 0.0:
131        timeout = None
132    return wait([handle], timeout)
133
134try:
135    MAXFD = os.sysconf("SC_OPEN_MAX")
136except:
137    MAXFD = 256
138
139# To speed up tests when using the forkserver, we can preload these:
140PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
141
142#
143# Some tests require ctypes
144#
145
146try:
147    from ctypes import Structure, c_int, c_double, c_longlong
148except ImportError:
149    Structure = object
150    c_int = c_double = c_longlong = None
151
152
153def check_enough_semaphores():
154    """Check that the system supports enough semaphores to run the test."""
155    # minimum number of semaphores available according to POSIX
156    nsems_min = 256
157    try:
158        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
159    except (AttributeError, ValueError):
160        # sysconf not available or setting not available
161        return
162    if nsems == -1 or nsems >= nsems_min:
163        return
164    raise unittest.SkipTest("The OS doesn't support enough semaphores "
165                            "to run the test (required: %d)." % nsems_min)
166
167
168#
169# Creates a wrapper for a function which records the time it takes to finish
170#
171
172class TimingWrapper(object):
173
174    def __init__(self, func):
175        self.func = func
176        self.elapsed = None
177
178    def __call__(self, *args, **kwds):
179        t = time.monotonic()
180        try:
181            return self.func(*args, **kwds)
182        finally:
183            self.elapsed = time.monotonic() - t
184
185#
186# Base class for test cases
187#
188
189class BaseTestCase(object):
190
191    ALLOWED_TYPES = ('processes', 'manager', 'threads')
192
193    def assertTimingAlmostEqual(self, a, b):
194        if CHECK_TIMINGS:
195            self.assertAlmostEqual(a, b, 1)
196
197    def assertReturnsIfImplemented(self, value, func, *args):
198        try:
199            res = func(*args)
200        except NotImplementedError:
201            pass
202        else:
203            return self.assertEqual(value, res)
204
205    # For the sanity of Windows users, rather than crashing or freezing in
206    # multiple ways.
207    def __reduce__(self, *args):
208        raise NotImplementedError("shouldn't try to pickle a test case")
209
210    __reduce_ex__ = __reduce__
211
212#
213# Return the value of a semaphore
214#
215
216def get_value(self):
217    try:
218        return self.get_value()
219    except AttributeError:
220        try:
221            return self._Semaphore__value
222        except AttributeError:
223            try:
224                return self._value
225            except AttributeError:
226                raise NotImplementedError
227
228#
229# Testcases
230#
231
232class DummyCallable:
233    def __call__(self, q, c):
234        assert isinstance(c, DummyCallable)
235        q.put(5)
236
237
238class _TestProcess(BaseTestCase):
239
240    ALLOWED_TYPES = ('processes', 'threads')
241
242    def test_current(self):
243        if self.TYPE == 'threads':
244            self.skipTest('test not appropriate for {}'.format(self.TYPE))
245
246        current = self.current_process()
247        authkey = current.authkey
248
249        self.assertTrue(current.is_alive())
250        self.assertTrue(not current.daemon)
251        self.assertIsInstance(authkey, bytes)
252        self.assertTrue(len(authkey) > 0)
253        self.assertEqual(current.ident, os.getpid())
254        self.assertEqual(current.exitcode, None)
255
256    def test_daemon_argument(self):
257        if self.TYPE == "threads":
258            self.skipTest('test not appropriate for {}'.format(self.TYPE))
259
260        # By default uses the current process's daemon flag.
261        proc0 = self.Process(target=self._test)
262        self.assertEqual(proc0.daemon, self.current_process().daemon)
263        proc1 = self.Process(target=self._test, daemon=True)
264        self.assertTrue(proc1.daemon)
265        proc2 = self.Process(target=self._test, daemon=False)
266        self.assertFalse(proc2.daemon)
267
268    @classmethod
269    def _test(cls, q, *args, **kwds):
270        current = cls.current_process()
271        q.put(args)
272        q.put(kwds)
273        q.put(current.name)
274        if cls.TYPE != 'threads':
275            q.put(bytes(current.authkey))
276            q.put(current.pid)
277
278    def test_parent_process_attributes(self):
279        if self.TYPE == "threads":
280            self.skipTest('test not appropriate for {}'.format(self.TYPE))
281
282        self.assertIsNone(self.parent_process())
283
284        rconn, wconn = self.Pipe(duplex=False)
285        p = self.Process(target=self._test_send_parent_process, args=(wconn,))
286        p.start()
287        p.join()
288        parent_pid, parent_name = rconn.recv()
289        self.assertEqual(parent_pid, self.current_process().pid)
290        self.assertEqual(parent_pid, os.getpid())
291        self.assertEqual(parent_name, self.current_process().name)
292
293    @classmethod
294    def _test_send_parent_process(cls, wconn):
295        from multiprocessing.process import parent_process
296        wconn.send([parent_process().pid, parent_process().name])
297
298    def test_parent_process(self):
299        if self.TYPE == "threads":
300            self.skipTest('test not appropriate for {}'.format(self.TYPE))
301
302        # Launch a child process. Make it launch a grandchild process. Kill the
303        # child process and make sure that the grandchild notices the death of
304        # its parent (a.k.a the child process).
305        rconn, wconn = self.Pipe(duplex=False)
306        p = self.Process(
307            target=self._test_create_grandchild_process, args=(wconn, ))
308        p.start()
309
310        if not rconn.poll(timeout=support.LONG_TIMEOUT):
311            raise AssertionError("Could not communicate with child process")
312        parent_process_status = rconn.recv()
313        self.assertEqual(parent_process_status, "alive")
314
315        p.terminate()
316        p.join()
317
318        if not rconn.poll(timeout=support.LONG_TIMEOUT):
319            raise AssertionError("Could not communicate with child process")
320        parent_process_status = rconn.recv()
321        self.assertEqual(parent_process_status, "not alive")
322
323    @classmethod
324    def _test_create_grandchild_process(cls, wconn):
325        p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
326        p.start()
327        time.sleep(300)
328
329    @classmethod
330    def _test_report_parent_status(cls, wconn):
331        from multiprocessing.process import parent_process
332        wconn.send("alive" if parent_process().is_alive() else "not alive")
333        parent_process().join(timeout=support.SHORT_TIMEOUT)
334        wconn.send("alive" if parent_process().is_alive() else "not alive")
335
336    def test_process(self):
337        q = self.Queue(1)
338        e = self.Event()
339        args = (q, 1, 2)
340        kwargs = {'hello':23, 'bye':2.54}
341        name = 'SomeProcess'
342        p = self.Process(
343            target=self._test, args=args, kwargs=kwargs, name=name
344            )
345        p.daemon = True
346        current = self.current_process()
347
348        if self.TYPE != 'threads':
349            self.assertEqual(p.authkey, current.authkey)
350        self.assertEqual(p.is_alive(), False)
351        self.assertEqual(p.daemon, True)
352        self.assertNotIn(p, self.active_children())
353        self.assertTrue(type(self.active_children()) is list)
354        self.assertEqual(p.exitcode, None)
355
356        p.start()
357
358        self.assertEqual(p.exitcode, None)
359        self.assertEqual(p.is_alive(), True)
360        self.assertIn(p, self.active_children())
361
362        self.assertEqual(q.get(), args[1:])
363        self.assertEqual(q.get(), kwargs)
364        self.assertEqual(q.get(), p.name)
365        if self.TYPE != 'threads':
366            self.assertEqual(q.get(), current.authkey)
367            self.assertEqual(q.get(), p.pid)
368
369        p.join()
370
371        self.assertEqual(p.exitcode, 0)
372        self.assertEqual(p.is_alive(), False)
373        self.assertNotIn(p, self.active_children())
374        close_queue(q)
375
376    @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
377    def test_process_mainthread_native_id(self):
378        if self.TYPE == 'threads':
379            self.skipTest('test not appropriate for {}'.format(self.TYPE))
380
381        current_mainthread_native_id = threading.main_thread().native_id
382
383        q = self.Queue(1)
384        p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
385        p.start()
386
387        child_mainthread_native_id = q.get()
388        p.join()
389        close_queue(q)
390
391        self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
392
393    @classmethod
394    def _test_process_mainthread_native_id(cls, q):
395        mainthread_native_id = threading.main_thread().native_id
396        q.put(mainthread_native_id)
397
398    @classmethod
399    def _sleep_some(cls):
400        time.sleep(100)
401
402    @classmethod
403    def _test_sleep(cls, delay):
404        time.sleep(delay)
405
406    def _kill_process(self, meth):
407        if self.TYPE == 'threads':
408            self.skipTest('test not appropriate for {}'.format(self.TYPE))
409
410        p = self.Process(target=self._sleep_some)
411        p.daemon = True
412        p.start()
413
414        self.assertEqual(p.is_alive(), True)
415        self.assertIn(p, self.active_children())
416        self.assertEqual(p.exitcode, None)
417
418        join = TimingWrapper(p.join)
419
420        self.assertEqual(join(0), None)
421        self.assertTimingAlmostEqual(join.elapsed, 0.0)
422        self.assertEqual(p.is_alive(), True)
423
424        self.assertEqual(join(-1), None)
425        self.assertTimingAlmostEqual(join.elapsed, 0.0)
426        self.assertEqual(p.is_alive(), True)
427
428        # XXX maybe terminating too soon causes the problems on Gentoo...
429        time.sleep(1)
430
431        meth(p)
432
433        if hasattr(signal, 'alarm'):
434            # On the Gentoo buildbot waitpid() often seems to block forever.
435            # We use alarm() to interrupt it if it blocks for too long.
436            def handler(*args):
437                raise RuntimeError('join took too long: %s' % p)
438            old_handler = signal.signal(signal.SIGALRM, handler)
439            try:
440                signal.alarm(10)
441                self.assertEqual(join(), None)
442            finally:
443                signal.alarm(0)
444                signal.signal(signal.SIGALRM, old_handler)
445        else:
446            self.assertEqual(join(), None)
447
448        self.assertTimingAlmostEqual(join.elapsed, 0.0)
449
450        self.assertEqual(p.is_alive(), False)
451        self.assertNotIn(p, self.active_children())
452
453        p.join()
454
455        return p.exitcode
456
457    def test_terminate(self):
458        exitcode = self._kill_process(multiprocessing.Process.terminate)
459        if os.name != 'nt':
460            self.assertEqual(exitcode, -signal.SIGTERM)
461
462    def test_kill(self):
463        exitcode = self._kill_process(multiprocessing.Process.kill)
464        if os.name != 'nt':
465            self.assertEqual(exitcode, -signal.SIGKILL)
466
467    def test_cpu_count(self):
468        try:
469            cpus = multiprocessing.cpu_count()
470        except NotImplementedError:
471            cpus = 1
472        self.assertTrue(type(cpus) is int)
473        self.assertTrue(cpus >= 1)
474
475    def test_active_children(self):
476        self.assertEqual(type(self.active_children()), list)
477
478        p = self.Process(target=time.sleep, args=(DELTA,))
479        self.assertNotIn(p, self.active_children())
480
481        p.daemon = True
482        p.start()
483        self.assertIn(p, self.active_children())
484
485        p.join()
486        self.assertNotIn(p, self.active_children())
487
488    @classmethod
489    def _test_recursion(cls, wconn, id):
490        wconn.send(id)
491        if len(id) < 2:
492            for i in range(2):
493                p = cls.Process(
494                    target=cls._test_recursion, args=(wconn, id+[i])
495                    )
496                p.start()
497                p.join()
498
499    def test_recursion(self):
500        rconn, wconn = self.Pipe(duplex=False)
501        self._test_recursion(wconn, [])
502
503        time.sleep(DELTA)
504        result = []
505        while rconn.poll():
506            result.append(rconn.recv())
507
508        expected = [
509            [],
510              [0],
511                [0, 0],
512                [0, 1],
513              [1],
514                [1, 0],
515                [1, 1]
516            ]
517        self.assertEqual(result, expected)
518
519    @classmethod
520    def _test_sentinel(cls, event):
521        event.wait(10.0)
522
523    def test_sentinel(self):
524        if self.TYPE == "threads":
525            self.skipTest('test not appropriate for {}'.format(self.TYPE))
526        event = self.Event()
527        p = self.Process(target=self._test_sentinel, args=(event,))
528        with self.assertRaises(ValueError):
529            p.sentinel
530        p.start()
531        self.addCleanup(p.join)
532        sentinel = p.sentinel
533        self.assertIsInstance(sentinel, int)
534        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
535        event.set()
536        p.join()
537        self.assertTrue(wait_for_handle(sentinel, timeout=1))
538
539    @classmethod
540    def _test_close(cls, rc=0, q=None):
541        if q is not None:
542            q.get()
543        sys.exit(rc)
544
545    def test_close(self):
546        if self.TYPE == "threads":
547            self.skipTest('test not appropriate for {}'.format(self.TYPE))
548        q = self.Queue()
549        p = self.Process(target=self._test_close, kwargs={'q': q})
550        p.daemon = True
551        p.start()
552        self.assertEqual(p.is_alive(), True)
553        # Child is still alive, cannot close
554        with self.assertRaises(ValueError):
555            p.close()
556
557        q.put(None)
558        p.join()
559        self.assertEqual(p.is_alive(), False)
560        self.assertEqual(p.exitcode, 0)
561        p.close()
562        with self.assertRaises(ValueError):
563            p.is_alive()
564        with self.assertRaises(ValueError):
565            p.join()
566        with self.assertRaises(ValueError):
567            p.terminate()
568        p.close()
569
570        wr = weakref.ref(p)
571        del p
572        gc.collect()
573        self.assertIs(wr(), None)
574
575        close_queue(q)
576
577    def test_many_processes(self):
578        if self.TYPE == 'threads':
579            self.skipTest('test not appropriate for {}'.format(self.TYPE))
580
581        sm = multiprocessing.get_start_method()
582        N = 5 if sm == 'spawn' else 100
583
584        # Try to overwhelm the forkserver loop with events
585        procs = [self.Process(target=self._test_sleep, args=(0.01,))
586                 for i in range(N)]
587        for p in procs:
588            p.start()
589        for p in procs:
590            join_process(p)
591        for p in procs:
592            self.assertEqual(p.exitcode, 0)
593
594        procs = [self.Process(target=self._sleep_some)
595                 for i in range(N)]
596        for p in procs:
597            p.start()
598        time.sleep(0.001)  # let the children start...
599        for p in procs:
600            p.terminate()
601        for p in procs:
602            join_process(p)
603        if os.name != 'nt':
604            exitcodes = [-signal.SIGTERM]
605            if sys.platform == 'darwin':
606                # bpo-31510: On macOS, killing a freshly started process with
607                # SIGTERM sometimes kills the process with SIGKILL.
608                exitcodes.append(-signal.SIGKILL)
609            for p in procs:
610                self.assertIn(p.exitcode, exitcodes)
611
612    def test_lose_target_ref(self):
613        c = DummyCallable()
614        wr = weakref.ref(c)
615        q = self.Queue()
616        p = self.Process(target=c, args=(q, c))
617        del c
618        p.start()
619        p.join()
620        gc.collect()  # For PyPy or other GCs.
621        self.assertIs(wr(), None)
622        self.assertEqual(q.get(), 5)
623        close_queue(q)
624
625    @classmethod
626    def _test_child_fd_inflation(self, evt, q):
627        q.put(os_helper.fd_count())
628        evt.wait()
629
630    def test_child_fd_inflation(self):
631        # Number of fds in child processes should not grow with the
632        # number of running children.
633        if self.TYPE == 'threads':
634            self.skipTest('test not appropriate for {}'.format(self.TYPE))
635
636        sm = multiprocessing.get_start_method()
637        if sm == 'fork':
638            # The fork method by design inherits all fds from the parent,
639            # trying to go against it is a lost battle
640            self.skipTest('test not appropriate for {}'.format(sm))
641
642        N = 5
643        evt = self.Event()
644        q = self.Queue()
645
646        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
647                 for i in range(N)]
648        for p in procs:
649            p.start()
650
651        try:
652            fd_counts = [q.get() for i in range(N)]
653            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
654
655        finally:
656            evt.set()
657            for p in procs:
658                p.join()
659            close_queue(q)
660
661    @classmethod
662    def _test_wait_for_threads(self, evt):
663        def func1():
664            time.sleep(0.5)
665            evt.set()
666
667        def func2():
668            time.sleep(20)
669            evt.clear()
670
671        threading.Thread(target=func1).start()
672        threading.Thread(target=func2, daemon=True).start()
673
674    def test_wait_for_threads(self):
675        # A child process should wait for non-daemonic threads to end
676        # before exiting
677        if self.TYPE == 'threads':
678            self.skipTest('test not appropriate for {}'.format(self.TYPE))
679
680        evt = self.Event()
681        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
682        proc.start()
683        proc.join()
684        self.assertTrue(evt.is_set())
685
686    @classmethod
687    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
688        for stream_name, action in break_std_streams.items():
689            if action == 'close':
690                stream = io.StringIO()
691                stream.close()
692            else:
693                assert action == 'remove'
694                stream = None
695            setattr(sys, stream_name, None)
696        evt.set()
697
698    def test_error_on_stdio_flush_1(self):
699        # Check that Process works with broken standard streams
700        streams = [io.StringIO(), None]
701        streams[0].close()
702        for stream_name in ('stdout', 'stderr'):
703            for stream in streams:
704                old_stream = getattr(sys, stream_name)
705                setattr(sys, stream_name, stream)
706                try:
707                    evt = self.Event()
708                    proc = self.Process(target=self._test_error_on_stdio_flush,
709                                        args=(evt,))
710                    proc.start()
711                    proc.join()
712                    self.assertTrue(evt.is_set())
713                    self.assertEqual(proc.exitcode, 0)
714                finally:
715                    setattr(sys, stream_name, old_stream)
716
717    def test_error_on_stdio_flush_2(self):
718        # Same as test_error_on_stdio_flush_1(), but standard streams are
719        # broken by the child process
720        for stream_name in ('stdout', 'stderr'):
721            for action in ('close', 'remove'):
722                old_stream = getattr(sys, stream_name)
723                try:
724                    evt = self.Event()
725                    proc = self.Process(target=self._test_error_on_stdio_flush,
726                                        args=(evt, {stream_name: action}))
727                    proc.start()
728                    proc.join()
729                    self.assertTrue(evt.is_set())
730                    self.assertEqual(proc.exitcode, 0)
731                finally:
732                    setattr(sys, stream_name, old_stream)
733
734    @classmethod
735    def _sleep_and_set_event(self, evt, delay=0.0):
736        time.sleep(delay)
737        evt.set()
738
739    def check_forkserver_death(self, signum):
740        # bpo-31308: if the forkserver process has died, we should still
741        # be able to create and run new Process instances (the forkserver
742        # is implicitly restarted).
743        if self.TYPE == 'threads':
744            self.skipTest('test not appropriate for {}'.format(self.TYPE))
745        sm = multiprocessing.get_start_method()
746        if sm != 'forkserver':
747            # The fork method by design inherits all fds from the parent,
748            # trying to go against it is a lost battle
749            self.skipTest('test not appropriate for {}'.format(sm))
750
751        from multiprocessing.forkserver import _forkserver
752        _forkserver.ensure_running()
753
754        # First process sleeps 500 ms
755        delay = 0.5
756
757        evt = self.Event()
758        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
759        proc.start()
760
761        pid = _forkserver._forkserver_pid
762        os.kill(pid, signum)
763        # give time to the fork server to die and time to proc to complete
764        time.sleep(delay * 2.0)
765
766        evt2 = self.Event()
767        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
768        proc2.start()
769        proc2.join()
770        self.assertTrue(evt2.is_set())
771        self.assertEqual(proc2.exitcode, 0)
772
773        proc.join()
774        self.assertTrue(evt.is_set())
775        self.assertIn(proc.exitcode, (0, 255))
776
777    def test_forkserver_sigint(self):
778        # Catchable signal
779        self.check_forkserver_death(signal.SIGINT)
780
781    def test_forkserver_sigkill(self):
782        # Uncatchable signal
783        if os.name != 'nt':
784            self.check_forkserver_death(signal.SIGKILL)
785
786
787#
788#
789#
790
791class _UpperCaser(multiprocessing.Process):
792
793    def __init__(self):
794        multiprocessing.Process.__init__(self)
795        self.child_conn, self.parent_conn = multiprocessing.Pipe()
796
797    def run(self):
798        self.parent_conn.close()
799        for s in iter(self.child_conn.recv, None):
800            self.child_conn.send(s.upper())
801        self.child_conn.close()
802
803    def submit(self, s):
804        assert type(s) is str
805        self.parent_conn.send(s)
806        return self.parent_conn.recv()
807
808    def stop(self):
809        self.parent_conn.send(None)
810        self.parent_conn.close()
811        self.child_conn.close()
812
813class _TestSubclassingProcess(BaseTestCase):
814
815    ALLOWED_TYPES = ('processes',)
816
817    def test_subclassing(self):
818        uppercaser = _UpperCaser()
819        uppercaser.daemon = True
820        uppercaser.start()
821        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
822        self.assertEqual(uppercaser.submit('world'), 'WORLD')
823        uppercaser.stop()
824        uppercaser.join()
825
826    def test_stderr_flush(self):
827        # sys.stderr is flushed at process shutdown (issue #13812)
828        if self.TYPE == "threads":
829            self.skipTest('test not appropriate for {}'.format(self.TYPE))
830
831        testfn = os_helper.TESTFN
832        self.addCleanup(os_helper.unlink, testfn)
833        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
834        proc.start()
835        proc.join()
836        with open(testfn, encoding="utf-8") as f:
837            err = f.read()
838            # The whole traceback was printed
839            self.assertIn("ZeroDivisionError", err)
840            self.assertIn("test_multiprocessing.py", err)
841            self.assertIn("1/0 # MARKER", err)
842
843    @classmethod
844    def _test_stderr_flush(cls, testfn):
845        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
846        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
847        1/0 # MARKER
848
849
850    @classmethod
851    def _test_sys_exit(cls, reason, testfn):
852        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
853        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
854        sys.exit(reason)
855
856    def test_sys_exit(self):
857        # See Issue 13854
858        if self.TYPE == 'threads':
859            self.skipTest('test not appropriate for {}'.format(self.TYPE))
860
861        testfn = os_helper.TESTFN
862        self.addCleanup(os_helper.unlink, testfn)
863
864        for reason in (
865            [1, 2, 3],
866            'ignore this',
867        ):
868            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
869            p.daemon = True
870            p.start()
871            join_process(p)
872            self.assertEqual(p.exitcode, 1)
873
874            with open(testfn, encoding="utf-8") as f:
875                content = f.read()
876            self.assertEqual(content.rstrip(), str(reason))
877
878            os.unlink(testfn)
879
880        cases = [
881            ((True,), 1),
882            ((False,), 0),
883            ((8,), 8),
884            ((None,), 0),
885            ((), 0),
886            ]
887
888        for args, expected in cases:
889            with self.subTest(args=args):
890                p = self.Process(target=sys.exit, args=args)
891                p.daemon = True
892                p.start()
893                join_process(p)
894                self.assertEqual(p.exitcode, expected)
895
896#
897#
898#
899
900def queue_empty(q):
901    if hasattr(q, 'empty'):
902        return q.empty()
903    else:
904        return q.qsize() == 0
905
906def queue_full(q, maxsize):
907    if hasattr(q, 'full'):
908        return q.full()
909    else:
910        return q.qsize() == maxsize
911
912
913class _TestQueue(BaseTestCase):
914
915
916    @classmethod
917    def _test_put(cls, queue, child_can_start, parent_can_continue):
918        child_can_start.wait()
919        for i in range(6):
920            queue.get()
921        parent_can_continue.set()
922
923    def test_put(self):
924        MAXSIZE = 6
925        queue = self.Queue(maxsize=MAXSIZE)
926        child_can_start = self.Event()
927        parent_can_continue = self.Event()
928
929        proc = self.Process(
930            target=self._test_put,
931            args=(queue, child_can_start, parent_can_continue)
932            )
933        proc.daemon = True
934        proc.start()
935
936        self.assertEqual(queue_empty(queue), True)
937        self.assertEqual(queue_full(queue, MAXSIZE), False)
938
939        queue.put(1)
940        queue.put(2, True)
941        queue.put(3, True, None)
942        queue.put(4, False)
943        queue.put(5, False, None)
944        queue.put_nowait(6)
945
946        # the values may be in buffer but not yet in pipe so sleep a bit
947        time.sleep(DELTA)
948
949        self.assertEqual(queue_empty(queue), False)
950        self.assertEqual(queue_full(queue, MAXSIZE), True)
951
952        put = TimingWrapper(queue.put)
953        put_nowait = TimingWrapper(queue.put_nowait)
954
955        self.assertRaises(pyqueue.Full, put, 7, False)
956        self.assertTimingAlmostEqual(put.elapsed, 0)
957
958        self.assertRaises(pyqueue.Full, put, 7, False, None)
959        self.assertTimingAlmostEqual(put.elapsed, 0)
960
961        self.assertRaises(pyqueue.Full, put_nowait, 7)
962        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
963
964        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
965        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
966
967        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
968        self.assertTimingAlmostEqual(put.elapsed, 0)
969
970        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
971        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
972
973        child_can_start.set()
974        parent_can_continue.wait()
975
976        self.assertEqual(queue_empty(queue), True)
977        self.assertEqual(queue_full(queue, MAXSIZE), False)
978
979        proc.join()
980        close_queue(queue)
981
982    @classmethod
983    def _test_get(cls, queue, child_can_start, parent_can_continue):
984        child_can_start.wait()
985        #queue.put(1)
986        queue.put(2)
987        queue.put(3)
988        queue.put(4)
989        queue.put(5)
990        parent_can_continue.set()
991
992    def test_get(self):
993        queue = self.Queue()
994        child_can_start = self.Event()
995        parent_can_continue = self.Event()
996
997        proc = self.Process(
998            target=self._test_get,
999            args=(queue, child_can_start, parent_can_continue)
1000            )
1001        proc.daemon = True
1002        proc.start()
1003
1004        self.assertEqual(queue_empty(queue), True)
1005
1006        child_can_start.set()
1007        parent_can_continue.wait()
1008
1009        time.sleep(DELTA)
1010        self.assertEqual(queue_empty(queue), False)
1011
1012        # Hangs unexpectedly, remove for now
1013        #self.assertEqual(queue.get(), 1)
1014        self.assertEqual(queue.get(True, None), 2)
1015        self.assertEqual(queue.get(True), 3)
1016        self.assertEqual(queue.get(timeout=1), 4)
1017        self.assertEqual(queue.get_nowait(), 5)
1018
1019        self.assertEqual(queue_empty(queue), True)
1020
1021        get = TimingWrapper(queue.get)
1022        get_nowait = TimingWrapper(queue.get_nowait)
1023
1024        self.assertRaises(pyqueue.Empty, get, False)
1025        self.assertTimingAlmostEqual(get.elapsed, 0)
1026
1027        self.assertRaises(pyqueue.Empty, get, False, None)
1028        self.assertTimingAlmostEqual(get.elapsed, 0)
1029
1030        self.assertRaises(pyqueue.Empty, get_nowait)
1031        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
1032
1033        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
1034        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1035
1036        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
1037        self.assertTimingAlmostEqual(get.elapsed, 0)
1038
1039        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1040        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1041
1042        proc.join()
1043        close_queue(queue)
1044
1045    @classmethod
1046    def _test_fork(cls, queue):
1047        for i in range(10, 20):
1048            queue.put(i)
1049        # note that at this point the items may only be buffered, so the
1050        # process cannot shutdown until the feeder thread has finished
1051        # pushing items onto the pipe.
1052
1053    def test_fork(self):
1054        # Old versions of Queue would fail to create a new feeder
1055        # thread for a forked process if the original process had its
1056        # own feeder thread.  This test checks that this no longer
1057        # happens.
1058
1059        queue = self.Queue()
1060
1061        # put items on queue so that main process starts a feeder thread
1062        for i in range(10):
1063            queue.put(i)
1064
1065        # wait to make sure thread starts before we fork a new process
1066        time.sleep(DELTA)
1067
1068        # fork process
1069        p = self.Process(target=self._test_fork, args=(queue,))
1070        p.daemon = True
1071        p.start()
1072
1073        # check that all expected items are in the queue
1074        for i in range(20):
1075            self.assertEqual(queue.get(), i)
1076        self.assertRaises(pyqueue.Empty, queue.get, False)
1077
1078        p.join()
1079        close_queue(queue)
1080
1081    def test_qsize(self):
1082        q = self.Queue()
1083        try:
1084            self.assertEqual(q.qsize(), 0)
1085        except NotImplementedError:
1086            self.skipTest('qsize method not implemented')
1087        q.put(1)
1088        self.assertEqual(q.qsize(), 1)
1089        q.put(5)
1090        self.assertEqual(q.qsize(), 2)
1091        q.get()
1092        self.assertEqual(q.qsize(), 1)
1093        q.get()
1094        self.assertEqual(q.qsize(), 0)
1095        close_queue(q)
1096
1097    @classmethod
1098    def _test_task_done(cls, q):
1099        for obj in iter(q.get, None):
1100            time.sleep(DELTA)
1101            q.task_done()
1102
1103    def test_task_done(self):
1104        queue = self.JoinableQueue()
1105
1106        workers = [self.Process(target=self._test_task_done, args=(queue,))
1107                   for i in range(4)]
1108
1109        for p in workers:
1110            p.daemon = True
1111            p.start()
1112
1113        for i in range(10):
1114            queue.put(i)
1115
1116        queue.join()
1117
1118        for p in workers:
1119            queue.put(None)
1120
1121        for p in workers:
1122            p.join()
1123        close_queue(queue)
1124
1125    def test_no_import_lock_contention(self):
1126        with os_helper.temp_cwd():
1127            module_name = 'imported_by_an_imported_module'
1128            with open(module_name + '.py', 'w', encoding="utf-8") as f:
1129                f.write("""if 1:
1130                    import multiprocessing
1131
1132                    q = multiprocessing.Queue()
1133                    q.put('knock knock')
1134                    q.get(timeout=3)
1135                    q.close()
1136                    del q
1137                """)
1138
1139            with import_helper.DirsOnSysPath(os.getcwd()):
1140                try:
1141                    __import__(module_name)
1142                except pyqueue.Empty:
1143                    self.fail("Probable regression on import lock contention;"
1144                              " see Issue #22853")
1145
1146    def test_timeout(self):
1147        q = multiprocessing.Queue()
1148        start = time.monotonic()
1149        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1150        delta = time.monotonic() - start
1151        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1152        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1153        # failed because the delta was only 135.8 ms.
1154        self.assertGreaterEqual(delta, 0.100)
1155        close_queue(q)
1156
1157    def test_queue_feeder_donot_stop_onexc(self):
1158        # bpo-30414: verify feeder handles exceptions correctly
1159        if self.TYPE != 'processes':
1160            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1161
1162        class NotSerializable(object):
1163            def __reduce__(self):
1164                raise AttributeError
1165        with test.support.captured_stderr():
1166            q = self.Queue()
1167            q.put(NotSerializable())
1168            q.put(True)
1169            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1170            close_queue(q)
1171
1172        with test.support.captured_stderr():
1173            # bpo-33078: verify that the queue size is correctly handled
1174            # on errors.
1175            q = self.Queue(maxsize=1)
1176            q.put(NotSerializable())
1177            q.put(True)
1178            try:
1179                self.assertEqual(q.qsize(), 1)
1180            except NotImplementedError:
1181                # qsize is not available on all platform as it
1182                # relies on sem_getvalue
1183                pass
1184            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1185            # Check that the size of the queue is correct
1186            self.assertTrue(q.empty())
1187            close_queue(q)
1188
1189    def test_queue_feeder_on_queue_feeder_error(self):
1190        # bpo-30006: verify feeder handles exceptions using the
1191        # _on_queue_feeder_error hook.
1192        if self.TYPE != 'processes':
1193            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1194
1195        class NotSerializable(object):
1196            """Mock unserializable object"""
1197            def __init__(self):
1198                self.reduce_was_called = False
1199                self.on_queue_feeder_error_was_called = False
1200
1201            def __reduce__(self):
1202                self.reduce_was_called = True
1203                raise AttributeError
1204
1205        class SafeQueue(multiprocessing.queues.Queue):
1206            """Queue with overloaded _on_queue_feeder_error hook"""
1207            @staticmethod
1208            def _on_queue_feeder_error(e, obj):
1209                if (isinstance(e, AttributeError) and
1210                        isinstance(obj, NotSerializable)):
1211                    obj.on_queue_feeder_error_was_called = True
1212
1213        not_serializable_obj = NotSerializable()
1214        # The captured_stderr reduces the noise in the test report
1215        with test.support.captured_stderr():
1216            q = SafeQueue(ctx=multiprocessing.get_context())
1217            q.put(not_serializable_obj)
1218
1219            # Verify that q is still functioning correctly
1220            q.put(True)
1221            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1222
1223        # Assert that the serialization and the hook have been called correctly
1224        self.assertTrue(not_serializable_obj.reduce_was_called)
1225        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1226
1227    def test_closed_queue_put_get_exceptions(self):
1228        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1229            q.close()
1230            with self.assertRaisesRegex(ValueError, 'is closed'):
1231                q.put('foo')
1232            with self.assertRaisesRegex(ValueError, 'is closed'):
1233                q.get()
1234#
1235#
1236#
1237
1238class _TestLock(BaseTestCase):
1239
1240    def test_lock(self):
1241        lock = self.Lock()
1242        self.assertEqual(lock.acquire(), True)
1243        self.assertEqual(lock.acquire(False), False)
1244        self.assertEqual(lock.release(), None)
1245        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1246
1247    def test_rlock(self):
1248        lock = self.RLock()
1249        self.assertEqual(lock.acquire(), True)
1250        self.assertEqual(lock.acquire(), True)
1251        self.assertEqual(lock.acquire(), True)
1252        self.assertEqual(lock.release(), None)
1253        self.assertEqual(lock.release(), None)
1254        self.assertEqual(lock.release(), None)
1255        self.assertRaises((AssertionError, RuntimeError), lock.release)
1256
1257    def test_lock_context(self):
1258        with self.Lock():
1259            pass
1260
1261
1262class _TestSemaphore(BaseTestCase):
1263
1264    def _test_semaphore(self, sem):
1265        self.assertReturnsIfImplemented(2, get_value, sem)
1266        self.assertEqual(sem.acquire(), True)
1267        self.assertReturnsIfImplemented(1, get_value, sem)
1268        self.assertEqual(sem.acquire(), True)
1269        self.assertReturnsIfImplemented(0, get_value, sem)
1270        self.assertEqual(sem.acquire(False), False)
1271        self.assertReturnsIfImplemented(0, get_value, sem)
1272        self.assertEqual(sem.release(), None)
1273        self.assertReturnsIfImplemented(1, get_value, sem)
1274        self.assertEqual(sem.release(), None)
1275        self.assertReturnsIfImplemented(2, get_value, sem)
1276
1277    def test_semaphore(self):
1278        sem = self.Semaphore(2)
1279        self._test_semaphore(sem)
1280        self.assertEqual(sem.release(), None)
1281        self.assertReturnsIfImplemented(3, get_value, sem)
1282        self.assertEqual(sem.release(), None)
1283        self.assertReturnsIfImplemented(4, get_value, sem)
1284
1285    def test_bounded_semaphore(self):
1286        sem = self.BoundedSemaphore(2)
1287        self._test_semaphore(sem)
1288        # Currently fails on OS/X
1289        #if HAVE_GETVALUE:
1290        #    self.assertRaises(ValueError, sem.release)
1291        #    self.assertReturnsIfImplemented(2, get_value, sem)
1292
1293    def test_timeout(self):
1294        if self.TYPE != 'processes':
1295            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1296
1297        sem = self.Semaphore(0)
1298        acquire = TimingWrapper(sem.acquire)
1299
1300        self.assertEqual(acquire(False), False)
1301        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1302
1303        self.assertEqual(acquire(False, None), False)
1304        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1305
1306        self.assertEqual(acquire(False, TIMEOUT1), False)
1307        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1308
1309        self.assertEqual(acquire(True, TIMEOUT2), False)
1310        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1311
1312        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1313        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1314
1315
1316class _TestCondition(BaseTestCase):
1317
1318    @classmethod
1319    def f(cls, cond, sleeping, woken, timeout=None):
1320        cond.acquire()
1321        sleeping.release()
1322        cond.wait(timeout)
1323        woken.release()
1324        cond.release()
1325
1326    def assertReachesEventually(self, func, value):
1327        for i in range(10):
1328            try:
1329                if func() == value:
1330                    break
1331            except NotImplementedError:
1332                break
1333            time.sleep(DELTA)
1334        time.sleep(DELTA)
1335        self.assertReturnsIfImplemented(value, func)
1336
1337    def check_invariant(self, cond):
1338        # this is only supposed to succeed when there are no sleepers
1339        if self.TYPE == 'processes':
1340            try:
1341                sleepers = (cond._sleeping_count.get_value() -
1342                            cond._woken_count.get_value())
1343                self.assertEqual(sleepers, 0)
1344                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1345            except NotImplementedError:
1346                pass
1347
1348    def test_notify(self):
1349        cond = self.Condition()
1350        sleeping = self.Semaphore(0)
1351        woken = self.Semaphore(0)
1352
1353        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1354        p.daemon = True
1355        p.start()
1356        self.addCleanup(p.join)
1357
1358        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1359        p.daemon = True
1360        p.start()
1361        self.addCleanup(p.join)
1362
1363        # wait for both children to start sleeping
1364        sleeping.acquire()
1365        sleeping.acquire()
1366
1367        # check no process/thread has woken up
1368        time.sleep(DELTA)
1369        self.assertReturnsIfImplemented(0, get_value, woken)
1370
1371        # wake up one process/thread
1372        cond.acquire()
1373        cond.notify()
1374        cond.release()
1375
1376        # check one process/thread has woken up
1377        time.sleep(DELTA)
1378        self.assertReturnsIfImplemented(1, get_value, woken)
1379
1380        # wake up another
1381        cond.acquire()
1382        cond.notify()
1383        cond.release()
1384
1385        # check other has woken up
1386        time.sleep(DELTA)
1387        self.assertReturnsIfImplemented(2, get_value, woken)
1388
1389        # check state is not mucked up
1390        self.check_invariant(cond)
1391        p.join()
1392
1393    def test_notify_all(self):
1394        cond = self.Condition()
1395        sleeping = self.Semaphore(0)
1396        woken = self.Semaphore(0)
1397
1398        # start some threads/processes which will timeout
1399        for i in range(3):
1400            p = self.Process(target=self.f,
1401                             args=(cond, sleeping, woken, TIMEOUT1))
1402            p.daemon = True
1403            p.start()
1404            self.addCleanup(p.join)
1405
1406            t = threading.Thread(target=self.f,
1407                                 args=(cond, sleeping, woken, TIMEOUT1))
1408            t.daemon = True
1409            t.start()
1410            self.addCleanup(t.join)
1411
1412        # wait for them all to sleep
1413        for i in range(6):
1414            sleeping.acquire()
1415
1416        # check they have all timed out
1417        for i in range(6):
1418            woken.acquire()
1419        self.assertReturnsIfImplemented(0, get_value, woken)
1420
1421        # check state is not mucked up
1422        self.check_invariant(cond)
1423
1424        # start some more threads/processes
1425        for i in range(3):
1426            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1427            p.daemon = True
1428            p.start()
1429            self.addCleanup(p.join)
1430
1431            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1432            t.daemon = True
1433            t.start()
1434            self.addCleanup(t.join)
1435
1436        # wait for them to all sleep
1437        for i in range(6):
1438            sleeping.acquire()
1439
1440        # check no process/thread has woken up
1441        time.sleep(DELTA)
1442        self.assertReturnsIfImplemented(0, get_value, woken)
1443
1444        # wake them all up
1445        cond.acquire()
1446        cond.notify_all()
1447        cond.release()
1448
1449        # check they have all woken
1450        self.assertReachesEventually(lambda: get_value(woken), 6)
1451
1452        # check state is not mucked up
1453        self.check_invariant(cond)
1454
1455    def test_notify_n(self):
1456        cond = self.Condition()
1457        sleeping = self.Semaphore(0)
1458        woken = self.Semaphore(0)
1459
1460        # start some threads/processes
1461        for i in range(3):
1462            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1463            p.daemon = True
1464            p.start()
1465            self.addCleanup(p.join)
1466
1467            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1468            t.daemon = True
1469            t.start()
1470            self.addCleanup(t.join)
1471
1472        # wait for them to all sleep
1473        for i in range(6):
1474            sleeping.acquire()
1475
1476        # check no process/thread has woken up
1477        time.sleep(DELTA)
1478        self.assertReturnsIfImplemented(0, get_value, woken)
1479
1480        # wake some of them up
1481        cond.acquire()
1482        cond.notify(n=2)
1483        cond.release()
1484
1485        # check 2 have woken
1486        self.assertReachesEventually(lambda: get_value(woken), 2)
1487
1488        # wake the rest of them
1489        cond.acquire()
1490        cond.notify(n=4)
1491        cond.release()
1492
1493        self.assertReachesEventually(lambda: get_value(woken), 6)
1494
1495        # doesn't do anything more
1496        cond.acquire()
1497        cond.notify(n=3)
1498        cond.release()
1499
1500        self.assertReturnsIfImplemented(6, get_value, woken)
1501
1502        # check state is not mucked up
1503        self.check_invariant(cond)
1504
1505    def test_timeout(self):
1506        cond = self.Condition()
1507        wait = TimingWrapper(cond.wait)
1508        cond.acquire()
1509        res = wait(TIMEOUT1)
1510        cond.release()
1511        self.assertEqual(res, False)
1512        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1513
1514    @classmethod
1515    def _test_waitfor_f(cls, cond, state):
1516        with cond:
1517            state.value = 0
1518            cond.notify()
1519            result = cond.wait_for(lambda : state.value==4)
1520            if not result or state.value != 4:
1521                sys.exit(1)
1522
1523    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1524    def test_waitfor(self):
1525        # based on test in test/lock_tests.py
1526        cond = self.Condition()
1527        state = self.Value('i', -1)
1528
1529        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1530        p.daemon = True
1531        p.start()
1532
1533        with cond:
1534            result = cond.wait_for(lambda : state.value==0)
1535            self.assertTrue(result)
1536            self.assertEqual(state.value, 0)
1537
1538        for i in range(4):
1539            time.sleep(0.01)
1540            with cond:
1541                state.value += 1
1542                cond.notify()
1543
1544        join_process(p)
1545        self.assertEqual(p.exitcode, 0)
1546
1547    @classmethod
1548    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1549        sem.release()
1550        with cond:
1551            expected = 0.1
1552            dt = time.monotonic()
1553            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1554            dt = time.monotonic() - dt
1555            # borrow logic in assertTimeout() from test/lock_tests.py
1556            if not result and expected * 0.6 < dt < expected * 10.0:
1557                success.value = True
1558
1559    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1560    def test_waitfor_timeout(self):
1561        # based on test in test/lock_tests.py
1562        cond = self.Condition()
1563        state = self.Value('i', 0)
1564        success = self.Value('i', False)
1565        sem = self.Semaphore(0)
1566
1567        p = self.Process(target=self._test_waitfor_timeout_f,
1568                         args=(cond, state, success, sem))
1569        p.daemon = True
1570        p.start()
1571        self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT))
1572
1573        # Only increment 3 times, so state == 4 is never reached.
1574        for i in range(3):
1575            time.sleep(0.01)
1576            with cond:
1577                state.value += 1
1578                cond.notify()
1579
1580        join_process(p)
1581        self.assertTrue(success.value)
1582
1583    @classmethod
1584    def _test_wait_result(cls, c, pid):
1585        with c:
1586            c.notify()
1587        time.sleep(1)
1588        if pid is not None:
1589            os.kill(pid, signal.SIGINT)
1590
1591    def test_wait_result(self):
1592        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1593            pid = os.getpid()
1594        else:
1595            pid = None
1596
1597        c = self.Condition()
1598        with c:
1599            self.assertFalse(c.wait(0))
1600            self.assertFalse(c.wait(0.1))
1601
1602            p = self.Process(target=self._test_wait_result, args=(c, pid))
1603            p.start()
1604
1605            self.assertTrue(c.wait(60))
1606            if pid is not None:
1607                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1608
1609            p.join()
1610
1611
1612class _TestEvent(BaseTestCase):
1613
1614    @classmethod
1615    def _test_event(cls, event):
1616        time.sleep(TIMEOUT2)
1617        event.set()
1618
1619    def test_event(self):
1620        event = self.Event()
1621        wait = TimingWrapper(event.wait)
1622
1623        # Removed temporarily, due to API shear, this does not
1624        # work with threading._Event objects. is_set == isSet
1625        self.assertEqual(event.is_set(), False)
1626
1627        # Removed, threading.Event.wait() will return the value of the __flag
1628        # instead of None. API Shear with the semaphore backed mp.Event
1629        self.assertEqual(wait(0.0), False)
1630        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1631        self.assertEqual(wait(TIMEOUT1), False)
1632        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1633
1634        event.set()
1635
1636        # See note above on the API differences
1637        self.assertEqual(event.is_set(), True)
1638        self.assertEqual(wait(), True)
1639        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1640        self.assertEqual(wait(TIMEOUT1), True)
1641        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1642        # self.assertEqual(event.is_set(), True)
1643
1644        event.clear()
1645
1646        #self.assertEqual(event.is_set(), False)
1647
1648        p = self.Process(target=self._test_event, args=(event,))
1649        p.daemon = True
1650        p.start()
1651        self.assertEqual(wait(), True)
1652        p.join()
1653
1654#
1655# Tests for Barrier - adapted from tests in test/lock_tests.py
1656#
1657
1658# Many of the tests for threading.Barrier use a list as an atomic
1659# counter: a value is appended to increment the counter, and the
1660# length of the list gives the value.  We use the class DummyList
1661# for the same purpose.
1662
1663class _DummyList(object):
1664
1665    def __init__(self):
1666        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1667        lock = multiprocessing.Lock()
1668        self.__setstate__((wrapper, lock))
1669        self._lengthbuf[0] = 0
1670
1671    def __setstate__(self, state):
1672        (self._wrapper, self._lock) = state
1673        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1674
1675    def __getstate__(self):
1676        return (self._wrapper, self._lock)
1677
1678    def append(self, _):
1679        with self._lock:
1680            self._lengthbuf[0] += 1
1681
1682    def __len__(self):
1683        with self._lock:
1684            return self._lengthbuf[0]
1685
1686def _wait():
1687    # A crude wait/yield function not relying on synchronization primitives.
1688    time.sleep(0.01)
1689
1690
1691class Bunch(object):
1692    """
1693    A bunch of threads.
1694    """
1695    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1696        """
1697        Construct a bunch of `n` threads running the same function `f`.
1698        If `wait_before_exit` is True, the threads won't terminate until
1699        do_finish() is called.
1700        """
1701        self.f = f
1702        self.args = args
1703        self.n = n
1704        self.started = namespace.DummyList()
1705        self.finished = namespace.DummyList()
1706        self._can_exit = namespace.Event()
1707        if not wait_before_exit:
1708            self._can_exit.set()
1709
1710        threads = []
1711        for i in range(n):
1712            p = namespace.Process(target=self.task)
1713            p.daemon = True
1714            p.start()
1715            threads.append(p)
1716
1717        def finalize(threads):
1718            for p in threads:
1719                p.join()
1720
1721        self._finalizer = weakref.finalize(self, finalize, threads)
1722
1723    def task(self):
1724        pid = os.getpid()
1725        self.started.append(pid)
1726        try:
1727            self.f(*self.args)
1728        finally:
1729            self.finished.append(pid)
1730            self._can_exit.wait(30)
1731            assert self._can_exit.is_set()
1732
1733    def wait_for_started(self):
1734        while len(self.started) < self.n:
1735            _wait()
1736
1737    def wait_for_finished(self):
1738        while len(self.finished) < self.n:
1739            _wait()
1740
1741    def do_finish(self):
1742        self._can_exit.set()
1743
1744    def close(self):
1745        self._finalizer()
1746
1747
1748class AppendTrue(object):
1749    def __init__(self, obj):
1750        self.obj = obj
1751    def __call__(self):
1752        self.obj.append(True)
1753
1754
1755class _TestBarrier(BaseTestCase):
1756    """
1757    Tests for Barrier objects.
1758    """
1759    N = 5
1760    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1761
1762    def setUp(self):
1763        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1764
1765    def tearDown(self):
1766        self.barrier.abort()
1767        self.barrier = None
1768
1769    def DummyList(self):
1770        if self.TYPE == 'threads':
1771            return []
1772        elif self.TYPE == 'manager':
1773            return self.manager.list()
1774        else:
1775            return _DummyList()
1776
1777    def run_threads(self, f, args):
1778        b = Bunch(self, f, args, self.N-1)
1779        try:
1780            f(*args)
1781            b.wait_for_finished()
1782        finally:
1783            b.close()
1784
1785    @classmethod
1786    def multipass(cls, barrier, results, n):
1787        m = barrier.parties
1788        assert m == cls.N
1789        for i in range(n):
1790            results[0].append(True)
1791            assert len(results[1]) == i * m
1792            barrier.wait()
1793            results[1].append(True)
1794            assert len(results[0]) == (i + 1) * m
1795            barrier.wait()
1796        try:
1797            assert barrier.n_waiting == 0
1798        except NotImplementedError:
1799            pass
1800        assert not barrier.broken
1801
1802    def test_barrier(self, passes=1):
1803        """
1804        Test that a barrier is passed in lockstep
1805        """
1806        results = [self.DummyList(), self.DummyList()]
1807        self.run_threads(self.multipass, (self.barrier, results, passes))
1808
1809    def test_barrier_10(self):
1810        """
1811        Test that a barrier works for 10 consecutive runs
1812        """
1813        return self.test_barrier(10)
1814
1815    @classmethod
1816    def _test_wait_return_f(cls, barrier, queue):
1817        res = barrier.wait()
1818        queue.put(res)
1819
1820    def test_wait_return(self):
1821        """
1822        test the return value from barrier.wait
1823        """
1824        queue = self.Queue()
1825        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1826        results = [queue.get() for i in range(self.N)]
1827        self.assertEqual(results.count(0), 1)
1828        close_queue(queue)
1829
1830    @classmethod
1831    def _test_action_f(cls, barrier, results):
1832        barrier.wait()
1833        if len(results) != 1:
1834            raise RuntimeError
1835
1836    def test_action(self):
1837        """
1838        Test the 'action' callback
1839        """
1840        results = self.DummyList()
1841        barrier = self.Barrier(self.N, action=AppendTrue(results))
1842        self.run_threads(self._test_action_f, (barrier, results))
1843        self.assertEqual(len(results), 1)
1844
1845    @classmethod
1846    def _test_abort_f(cls, barrier, results1, results2):
1847        try:
1848            i = barrier.wait()
1849            if i == cls.N//2:
1850                raise RuntimeError
1851            barrier.wait()
1852            results1.append(True)
1853        except threading.BrokenBarrierError:
1854            results2.append(True)
1855        except RuntimeError:
1856            barrier.abort()
1857
1858    def test_abort(self):
1859        """
1860        Test that an abort will put the barrier in a broken state
1861        """
1862        results1 = self.DummyList()
1863        results2 = self.DummyList()
1864        self.run_threads(self._test_abort_f,
1865                         (self.barrier, results1, results2))
1866        self.assertEqual(len(results1), 0)
1867        self.assertEqual(len(results2), self.N-1)
1868        self.assertTrue(self.barrier.broken)
1869
1870    @classmethod
1871    def _test_reset_f(cls, barrier, results1, results2, results3):
1872        i = barrier.wait()
1873        if i == cls.N//2:
1874            # Wait until the other threads are all in the barrier.
1875            while barrier.n_waiting < cls.N-1:
1876                time.sleep(0.001)
1877            barrier.reset()
1878        else:
1879            try:
1880                barrier.wait()
1881                results1.append(True)
1882            except threading.BrokenBarrierError:
1883                results2.append(True)
1884        # Now, pass the barrier again
1885        barrier.wait()
1886        results3.append(True)
1887
1888    def test_reset(self):
1889        """
1890        Test that a 'reset' on a barrier frees the waiting threads
1891        """
1892        results1 = self.DummyList()
1893        results2 = self.DummyList()
1894        results3 = self.DummyList()
1895        self.run_threads(self._test_reset_f,
1896                         (self.barrier, results1, results2, results3))
1897        self.assertEqual(len(results1), 0)
1898        self.assertEqual(len(results2), self.N-1)
1899        self.assertEqual(len(results3), self.N)
1900
1901    @classmethod
1902    def _test_abort_and_reset_f(cls, barrier, barrier2,
1903                                results1, results2, results3):
1904        try:
1905            i = barrier.wait()
1906            if i == cls.N//2:
1907                raise RuntimeError
1908            barrier.wait()
1909            results1.append(True)
1910        except threading.BrokenBarrierError:
1911            results2.append(True)
1912        except RuntimeError:
1913            barrier.abort()
1914        # Synchronize and reset the barrier.  Must synchronize first so
1915        # that everyone has left it when we reset, and after so that no
1916        # one enters it before the reset.
1917        if barrier2.wait() == cls.N//2:
1918            barrier.reset()
1919        barrier2.wait()
1920        barrier.wait()
1921        results3.append(True)
1922
1923    def test_abort_and_reset(self):
1924        """
1925        Test that a barrier can be reset after being broken.
1926        """
1927        results1 = self.DummyList()
1928        results2 = self.DummyList()
1929        results3 = self.DummyList()
1930        barrier2 = self.Barrier(self.N)
1931
1932        self.run_threads(self._test_abort_and_reset_f,
1933                         (self.barrier, barrier2, results1, results2, results3))
1934        self.assertEqual(len(results1), 0)
1935        self.assertEqual(len(results2), self.N-1)
1936        self.assertEqual(len(results3), self.N)
1937
1938    @classmethod
1939    def _test_timeout_f(cls, barrier, results):
1940        i = barrier.wait()
1941        if i == cls.N//2:
1942            # One thread is late!
1943            time.sleep(1.0)
1944        try:
1945            barrier.wait(0.5)
1946        except threading.BrokenBarrierError:
1947            results.append(True)
1948
1949    def test_timeout(self):
1950        """
1951        Test wait(timeout)
1952        """
1953        results = self.DummyList()
1954        self.run_threads(self._test_timeout_f, (self.barrier, results))
1955        self.assertEqual(len(results), self.barrier.parties)
1956
1957    @classmethod
1958    def _test_default_timeout_f(cls, barrier, results):
1959        i = barrier.wait(cls.defaultTimeout)
1960        if i == cls.N//2:
1961            # One thread is later than the default timeout
1962            time.sleep(1.0)
1963        try:
1964            barrier.wait()
1965        except threading.BrokenBarrierError:
1966            results.append(True)
1967
1968    def test_default_timeout(self):
1969        """
1970        Test the barrier's default timeout
1971        """
1972        barrier = self.Barrier(self.N, timeout=0.5)
1973        results = self.DummyList()
1974        self.run_threads(self._test_default_timeout_f, (barrier, results))
1975        self.assertEqual(len(results), barrier.parties)
1976
1977    def test_single_thread(self):
1978        b = self.Barrier(1)
1979        b.wait()
1980        b.wait()
1981
1982    @classmethod
1983    def _test_thousand_f(cls, barrier, passes, conn, lock):
1984        for i in range(passes):
1985            barrier.wait()
1986            with lock:
1987                conn.send(i)
1988
1989    def test_thousand(self):
1990        if self.TYPE == 'manager':
1991            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1992        passes = 1000
1993        lock = self.Lock()
1994        conn, child_conn = self.Pipe(False)
1995        for j in range(self.N):
1996            p = self.Process(target=self._test_thousand_f,
1997                           args=(self.barrier, passes, child_conn, lock))
1998            p.start()
1999            self.addCleanup(p.join)
2000
2001        for i in range(passes):
2002            for j in range(self.N):
2003                self.assertEqual(conn.recv(), i)
2004
2005#
2006#
2007#
2008
2009class _TestValue(BaseTestCase):
2010
2011    ALLOWED_TYPES = ('processes',)
2012
2013    codes_values = [
2014        ('i', 4343, 24234),
2015        ('d', 3.625, -4.25),
2016        ('h', -232, 234),
2017        ('q', 2 ** 33, 2 ** 34),
2018        ('c', latin('x'), latin('y'))
2019        ]
2020
2021    def setUp(self):
2022        if not HAS_SHAREDCTYPES:
2023            self.skipTest("requires multiprocessing.sharedctypes")
2024
2025    @classmethod
2026    def _test(cls, values):
2027        for sv, cv in zip(values, cls.codes_values):
2028            sv.value = cv[2]
2029
2030
2031    def test_value(self, raw=False):
2032        if raw:
2033            values = [self.RawValue(code, value)
2034                      for code, value, _ in self.codes_values]
2035        else:
2036            values = [self.Value(code, value)
2037                      for code, value, _ in self.codes_values]
2038
2039        for sv, cv in zip(values, self.codes_values):
2040            self.assertEqual(sv.value, cv[1])
2041
2042        proc = self.Process(target=self._test, args=(values,))
2043        proc.daemon = True
2044        proc.start()
2045        proc.join()
2046
2047        for sv, cv in zip(values, self.codes_values):
2048            self.assertEqual(sv.value, cv[2])
2049
2050    def test_rawvalue(self):
2051        self.test_value(raw=True)
2052
2053    def test_getobj_getlock(self):
2054        val1 = self.Value('i', 5)
2055        lock1 = val1.get_lock()
2056        obj1 = val1.get_obj()
2057
2058        val2 = self.Value('i', 5, lock=None)
2059        lock2 = val2.get_lock()
2060        obj2 = val2.get_obj()
2061
2062        lock = self.Lock()
2063        val3 = self.Value('i', 5, lock=lock)
2064        lock3 = val3.get_lock()
2065        obj3 = val3.get_obj()
2066        self.assertEqual(lock, lock3)
2067
2068        arr4 = self.Value('i', 5, lock=False)
2069        self.assertFalse(hasattr(arr4, 'get_lock'))
2070        self.assertFalse(hasattr(arr4, 'get_obj'))
2071
2072        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2073
2074        arr5 = self.RawValue('i', 5)
2075        self.assertFalse(hasattr(arr5, 'get_lock'))
2076        self.assertFalse(hasattr(arr5, 'get_obj'))
2077
2078
2079class _TestArray(BaseTestCase):
2080
2081    ALLOWED_TYPES = ('processes',)
2082
2083    @classmethod
2084    def f(cls, seq):
2085        for i in range(1, len(seq)):
2086            seq[i] += seq[i-1]
2087
2088    @unittest.skipIf(c_int is None, "requires _ctypes")
2089    def test_array(self, raw=False):
2090        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2091        if raw:
2092            arr = self.RawArray('i', seq)
2093        else:
2094            arr = self.Array('i', seq)
2095
2096        self.assertEqual(len(arr), len(seq))
2097        self.assertEqual(arr[3], seq[3])
2098        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2099
2100        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2101
2102        self.assertEqual(list(arr[:]), seq)
2103
2104        self.f(seq)
2105
2106        p = self.Process(target=self.f, args=(arr,))
2107        p.daemon = True
2108        p.start()
2109        p.join()
2110
2111        self.assertEqual(list(arr[:]), seq)
2112
2113    @unittest.skipIf(c_int is None, "requires _ctypes")
2114    def test_array_from_size(self):
2115        size = 10
2116        # Test for zeroing (see issue #11675).
2117        # The repetition below strengthens the test by increasing the chances
2118        # of previously allocated non-zero memory being used for the new array
2119        # on the 2nd and 3rd loops.
2120        for _ in range(3):
2121            arr = self.Array('i', size)
2122            self.assertEqual(len(arr), size)
2123            self.assertEqual(list(arr), [0] * size)
2124            arr[:] = range(10)
2125            self.assertEqual(list(arr), list(range(10)))
2126            del arr
2127
2128    @unittest.skipIf(c_int is None, "requires _ctypes")
2129    def test_rawarray(self):
2130        self.test_array(raw=True)
2131
2132    @unittest.skipIf(c_int is None, "requires _ctypes")
2133    def test_getobj_getlock_obj(self):
2134        arr1 = self.Array('i', list(range(10)))
2135        lock1 = arr1.get_lock()
2136        obj1 = arr1.get_obj()
2137
2138        arr2 = self.Array('i', list(range(10)), lock=None)
2139        lock2 = arr2.get_lock()
2140        obj2 = arr2.get_obj()
2141
2142        lock = self.Lock()
2143        arr3 = self.Array('i', list(range(10)), lock=lock)
2144        lock3 = arr3.get_lock()
2145        obj3 = arr3.get_obj()
2146        self.assertEqual(lock, lock3)
2147
2148        arr4 = self.Array('i', range(10), lock=False)
2149        self.assertFalse(hasattr(arr4, 'get_lock'))
2150        self.assertFalse(hasattr(arr4, 'get_obj'))
2151        self.assertRaises(AttributeError,
2152                          self.Array, 'i', range(10), lock='notalock')
2153
2154        arr5 = self.RawArray('i', range(10))
2155        self.assertFalse(hasattr(arr5, 'get_lock'))
2156        self.assertFalse(hasattr(arr5, 'get_obj'))
2157
2158#
2159#
2160#
2161
2162class _TestContainers(BaseTestCase):
2163
2164    ALLOWED_TYPES = ('manager',)
2165
2166    def test_list(self):
2167        a = self.list(list(range(10)))
2168        self.assertEqual(a[:], list(range(10)))
2169
2170        b = self.list()
2171        self.assertEqual(b[:], [])
2172
2173        b.extend(list(range(5)))
2174        self.assertEqual(b[:], list(range(5)))
2175
2176        self.assertEqual(b[2], 2)
2177        self.assertEqual(b[2:10], [2,3,4])
2178
2179        b *= 2
2180        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2181
2182        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2183
2184        self.assertEqual(a[:], list(range(10)))
2185
2186        d = [a, b]
2187        e = self.list(d)
2188        self.assertEqual(
2189            [element[:] for element in e],
2190            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2191            )
2192
2193        f = self.list([a])
2194        a.append('hello')
2195        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2196
2197    def test_list_iter(self):
2198        a = self.list(list(range(10)))
2199        it = iter(a)
2200        self.assertEqual(list(it), list(range(10)))
2201        self.assertEqual(list(it), [])  # exhausted
2202        # list modified during iteration
2203        it = iter(a)
2204        a[0] = 100
2205        self.assertEqual(next(it), 100)
2206
2207    def test_list_proxy_in_list(self):
2208        a = self.list([self.list(range(3)) for _i in range(3)])
2209        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2210
2211        a[0][-1] = 55
2212        self.assertEqual(a[0][:], [0, 1, 55])
2213        for i in range(1, 3):
2214            self.assertEqual(a[i][:], [0, 1, 2])
2215
2216        self.assertEqual(a[1].pop(), 2)
2217        self.assertEqual(len(a[1]), 2)
2218        for i in range(0, 3, 2):
2219            self.assertEqual(len(a[i]), 3)
2220
2221        del a
2222
2223        b = self.list()
2224        b.append(b)
2225        del b
2226
2227    def test_dict(self):
2228        d = self.dict()
2229        indices = list(range(65, 70))
2230        for i in indices:
2231            d[i] = chr(i)
2232        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2233        self.assertEqual(sorted(d.keys()), indices)
2234        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2235        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2236
2237    def test_dict_iter(self):
2238        d = self.dict()
2239        indices = list(range(65, 70))
2240        for i in indices:
2241            d[i] = chr(i)
2242        it = iter(d)
2243        self.assertEqual(list(it), indices)
2244        self.assertEqual(list(it), [])  # exhausted
2245        # dictionary changed size during iteration
2246        it = iter(d)
2247        d.clear()
2248        self.assertRaises(RuntimeError, next, it)
2249
2250    def test_dict_proxy_nested(self):
2251        pets = self.dict(ferrets=2, hamsters=4)
2252        supplies = self.dict(water=10, feed=3)
2253        d = self.dict(pets=pets, supplies=supplies)
2254
2255        self.assertEqual(supplies['water'], 10)
2256        self.assertEqual(d['supplies']['water'], 10)
2257
2258        d['supplies']['blankets'] = 5
2259        self.assertEqual(supplies['blankets'], 5)
2260        self.assertEqual(d['supplies']['blankets'], 5)
2261
2262        d['supplies']['water'] = 7
2263        self.assertEqual(supplies['water'], 7)
2264        self.assertEqual(d['supplies']['water'], 7)
2265
2266        del pets
2267        del supplies
2268        self.assertEqual(d['pets']['ferrets'], 2)
2269        d['supplies']['blankets'] = 11
2270        self.assertEqual(d['supplies']['blankets'], 11)
2271
2272        pets = d['pets']
2273        supplies = d['supplies']
2274        supplies['water'] = 7
2275        self.assertEqual(supplies['water'], 7)
2276        self.assertEqual(d['supplies']['water'], 7)
2277
2278        d.clear()
2279        self.assertEqual(len(d), 0)
2280        self.assertEqual(supplies['water'], 7)
2281        self.assertEqual(pets['hamsters'], 4)
2282
2283        l = self.list([pets, supplies])
2284        l[0]['marmots'] = 1
2285        self.assertEqual(pets['marmots'], 1)
2286        self.assertEqual(l[0]['marmots'], 1)
2287
2288        del pets
2289        del supplies
2290        self.assertEqual(l[0]['marmots'], 1)
2291
2292        outer = self.list([[88, 99], l])
2293        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2294        self.assertEqual(outer[-1][-1]['feed'], 3)
2295
2296    def test_nested_queue(self):
2297        a = self.list() # Test queue inside list
2298        a.append(self.Queue())
2299        a[0].put(123)
2300        self.assertEqual(a[0].get(), 123)
2301        b = self.dict() # Test queue inside dict
2302        b[0] = self.Queue()
2303        b[0].put(456)
2304        self.assertEqual(b[0].get(), 456)
2305
2306    def test_namespace(self):
2307        n = self.Namespace()
2308        n.name = 'Bob'
2309        n.job = 'Builder'
2310        n._hidden = 'hidden'
2311        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2312        del n.job
2313        self.assertEqual(str(n), "Namespace(name='Bob')")
2314        self.assertTrue(hasattr(n, 'name'))
2315        self.assertTrue(not hasattr(n, 'job'))
2316
2317#
2318#
2319#
2320
2321def sqr(x, wait=0.0):
2322    time.sleep(wait)
2323    return x*x
2324
2325def mul(x, y):
2326    return x*y
2327
2328def raise_large_valuerror(wait):
2329    time.sleep(wait)
2330    raise ValueError("x" * 1024**2)
2331
2332def identity(x):
2333    return x
2334
2335class CountedObject(object):
2336    n_instances = 0
2337
2338    def __new__(cls):
2339        cls.n_instances += 1
2340        return object.__new__(cls)
2341
2342    def __del__(self):
2343        type(self).n_instances -= 1
2344
2345class SayWhenError(ValueError): pass
2346
2347def exception_throwing_generator(total, when):
2348    if when == -1:
2349        raise SayWhenError("Somebody said when")
2350    for i in range(total):
2351        if i == when:
2352            raise SayWhenError("Somebody said when")
2353        yield i
2354
2355
2356class _TestPool(BaseTestCase):
2357
2358    @classmethod
2359    def setUpClass(cls):
2360        super().setUpClass()
2361        cls.pool = cls.Pool(4)
2362
2363    @classmethod
2364    def tearDownClass(cls):
2365        cls.pool.terminate()
2366        cls.pool.join()
2367        cls.pool = None
2368        super().tearDownClass()
2369
2370    def test_apply(self):
2371        papply = self.pool.apply
2372        self.assertEqual(papply(sqr, (5,)), sqr(5))
2373        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2374
2375    def test_map(self):
2376        pmap = self.pool.map
2377        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2378        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2379                         list(map(sqr, list(range(100)))))
2380
2381    def test_starmap(self):
2382        psmap = self.pool.starmap
2383        tuples = list(zip(range(10), range(9,-1, -1)))
2384        self.assertEqual(psmap(mul, tuples),
2385                         list(itertools.starmap(mul, tuples)))
2386        tuples = list(zip(range(100), range(99,-1, -1)))
2387        self.assertEqual(psmap(mul, tuples, chunksize=20),
2388                         list(itertools.starmap(mul, tuples)))
2389
2390    def test_starmap_async(self):
2391        tuples = list(zip(range(100), range(99,-1, -1)))
2392        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2393                         list(itertools.starmap(mul, tuples)))
2394
2395    def test_map_async(self):
2396        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2397                         list(map(sqr, list(range(10)))))
2398
2399    def test_map_async_callbacks(self):
2400        call_args = self.manager.list() if self.TYPE == 'manager' else []
2401        self.pool.map_async(int, ['1'],
2402                            callback=call_args.append,
2403                            error_callback=call_args.append).wait()
2404        self.assertEqual(1, len(call_args))
2405        self.assertEqual([1], call_args[0])
2406        self.pool.map_async(int, ['a'],
2407                            callback=call_args.append,
2408                            error_callback=call_args.append).wait()
2409        self.assertEqual(2, len(call_args))
2410        self.assertIsInstance(call_args[1], ValueError)
2411
2412    def test_map_unplicklable(self):
2413        # Issue #19425 -- failure to pickle should not cause a hang
2414        if self.TYPE == 'threads':
2415            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2416        class A(object):
2417            def __reduce__(self):
2418                raise RuntimeError('cannot pickle')
2419        with self.assertRaises(RuntimeError):
2420            self.pool.map(sqr, [A()]*10)
2421
2422    def test_map_chunksize(self):
2423        try:
2424            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2425        except multiprocessing.TimeoutError:
2426            self.fail("pool.map_async with chunksize stalled on null list")
2427
2428    def test_map_handle_iterable_exception(self):
2429        if self.TYPE == 'manager':
2430            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2431
2432        # SayWhenError seen at the very first of the iterable
2433        with self.assertRaises(SayWhenError):
2434            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2435        # again, make sure it's reentrant
2436        with self.assertRaises(SayWhenError):
2437            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2438
2439        with self.assertRaises(SayWhenError):
2440            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2441
2442        class SpecialIterable:
2443            def __iter__(self):
2444                return self
2445            def __next__(self):
2446                raise SayWhenError
2447            def __len__(self):
2448                return 1
2449        with self.assertRaises(SayWhenError):
2450            self.pool.map(sqr, SpecialIterable(), 1)
2451        with self.assertRaises(SayWhenError):
2452            self.pool.map(sqr, SpecialIterable(), 1)
2453
2454    def test_async(self):
2455        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2456        get = TimingWrapper(res.get)
2457        self.assertEqual(get(), 49)
2458        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2459
2460    def test_async_timeout(self):
2461        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
2462        get = TimingWrapper(res.get)
2463        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2464        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2465
2466    def test_imap(self):
2467        it = self.pool.imap(sqr, list(range(10)))
2468        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2469
2470        it = self.pool.imap(sqr, list(range(10)))
2471        for i in range(10):
2472            self.assertEqual(next(it), i*i)
2473        self.assertRaises(StopIteration, it.__next__)
2474
2475        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2476        for i in range(1000):
2477            self.assertEqual(next(it), i*i)
2478        self.assertRaises(StopIteration, it.__next__)
2479
2480    def test_imap_handle_iterable_exception(self):
2481        if self.TYPE == 'manager':
2482            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2483
2484        # SayWhenError seen at the very first of the iterable
2485        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2486        self.assertRaises(SayWhenError, it.__next__)
2487        # again, make sure it's reentrant
2488        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2489        self.assertRaises(SayWhenError, it.__next__)
2490
2491        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2492        for i in range(3):
2493            self.assertEqual(next(it), i*i)
2494        self.assertRaises(SayWhenError, it.__next__)
2495
2496        # SayWhenError seen at start of problematic chunk's results
2497        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2498        for i in range(6):
2499            self.assertEqual(next(it), i*i)
2500        self.assertRaises(SayWhenError, it.__next__)
2501        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2502        for i in range(4):
2503            self.assertEqual(next(it), i*i)
2504        self.assertRaises(SayWhenError, it.__next__)
2505
2506    def test_imap_unordered(self):
2507        it = self.pool.imap_unordered(sqr, list(range(10)))
2508        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2509
2510        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2511        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2512
2513    def test_imap_unordered_handle_iterable_exception(self):
2514        if self.TYPE == 'manager':
2515            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2516
2517        # SayWhenError seen at the very first of the iterable
2518        it = self.pool.imap_unordered(sqr,
2519                                      exception_throwing_generator(1, -1),
2520                                      1)
2521        self.assertRaises(SayWhenError, it.__next__)
2522        # again, make sure it's reentrant
2523        it = self.pool.imap_unordered(sqr,
2524                                      exception_throwing_generator(1, -1),
2525                                      1)
2526        self.assertRaises(SayWhenError, it.__next__)
2527
2528        it = self.pool.imap_unordered(sqr,
2529                                      exception_throwing_generator(10, 3),
2530                                      1)
2531        expected_values = list(map(sqr, list(range(10))))
2532        with self.assertRaises(SayWhenError):
2533            # imap_unordered makes it difficult to anticipate the SayWhenError
2534            for i in range(10):
2535                value = next(it)
2536                self.assertIn(value, expected_values)
2537                expected_values.remove(value)
2538
2539        it = self.pool.imap_unordered(sqr,
2540                                      exception_throwing_generator(20, 7),
2541                                      2)
2542        expected_values = list(map(sqr, list(range(20))))
2543        with self.assertRaises(SayWhenError):
2544            for i in range(20):
2545                value = next(it)
2546                self.assertIn(value, expected_values)
2547                expected_values.remove(value)
2548
2549    def test_make_pool(self):
2550        expected_error = (RemoteError if self.TYPE == 'manager'
2551                          else ValueError)
2552
2553        self.assertRaises(expected_error, self.Pool, -1)
2554        self.assertRaises(expected_error, self.Pool, 0)
2555
2556        if self.TYPE != 'manager':
2557            p = self.Pool(3)
2558            try:
2559                self.assertEqual(3, len(p._pool))
2560            finally:
2561                p.close()
2562                p.join()
2563
2564    def test_terminate(self):
2565        result = self.pool.map_async(
2566            time.sleep, [0.1 for i in range(10000)], chunksize=1
2567            )
2568        self.pool.terminate()
2569        join = TimingWrapper(self.pool.join)
2570        join()
2571        # Sanity check the pool didn't wait for all tasks to finish
2572        self.assertLess(join.elapsed, 2.0)
2573
2574    def test_empty_iterable(self):
2575        # See Issue 12157
2576        p = self.Pool(1)
2577
2578        self.assertEqual(p.map(sqr, []), [])
2579        self.assertEqual(list(p.imap(sqr, [])), [])
2580        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2581        self.assertEqual(p.map_async(sqr, []).get(), [])
2582
2583        p.close()
2584        p.join()
2585
2586    def test_context(self):
2587        if self.TYPE == 'processes':
2588            L = list(range(10))
2589            expected = [sqr(i) for i in L]
2590            with self.Pool(2) as p:
2591                r = p.map_async(sqr, L)
2592                self.assertEqual(r.get(), expected)
2593            p.join()
2594            self.assertRaises(ValueError, p.map_async, sqr, L)
2595
2596    @classmethod
2597    def _test_traceback(cls):
2598        raise RuntimeError(123) # some comment
2599
2600    def test_traceback(self):
2601        # We want ensure that the traceback from the child process is
2602        # contained in the traceback raised in the main process.
2603        if self.TYPE == 'processes':
2604            with self.Pool(1) as p:
2605                try:
2606                    p.apply(self._test_traceback)
2607                except Exception as e:
2608                    exc = e
2609                else:
2610                    self.fail('expected RuntimeError')
2611            p.join()
2612            self.assertIs(type(exc), RuntimeError)
2613            self.assertEqual(exc.args, (123,))
2614            cause = exc.__cause__
2615            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2616            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2617
2618            with test.support.captured_stderr() as f1:
2619                try:
2620                    raise exc
2621                except RuntimeError:
2622                    sys.excepthook(*sys.exc_info())
2623            self.assertIn('raise RuntimeError(123) # some comment',
2624                          f1.getvalue())
2625            # _helper_reraises_exception should not make the error
2626            # a remote exception
2627            with self.Pool(1) as p:
2628                try:
2629                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2630                except Exception as e:
2631                    exc = e
2632                else:
2633                    self.fail('expected SayWhenError')
2634                self.assertIs(type(exc), SayWhenError)
2635                self.assertIs(exc.__cause__, None)
2636            p.join()
2637
2638    @classmethod
2639    def _test_wrapped_exception(cls):
2640        raise RuntimeError('foo')
2641
2642    def test_wrapped_exception(self):
2643        # Issue #20980: Should not wrap exception when using thread pool
2644        with self.Pool(1) as p:
2645            with self.assertRaises(RuntimeError):
2646                p.apply(self._test_wrapped_exception)
2647        p.join()
2648
2649    def test_map_no_failfast(self):
2650        # Issue #23992: the fail-fast behaviour when an exception is raised
2651        # during map() would make Pool.join() deadlock, because a worker
2652        # process would fill the result queue (after the result handler thread
2653        # terminated, hence not draining it anymore).
2654
2655        t_start = time.monotonic()
2656
2657        with self.assertRaises(ValueError):
2658            with self.Pool(2) as p:
2659                try:
2660                    p.map(raise_large_valuerror, [0, 1])
2661                finally:
2662                    time.sleep(0.5)
2663                    p.close()
2664                    p.join()
2665
2666        # check that we indeed waited for all jobs
2667        self.assertGreater(time.monotonic() - t_start, 0.9)
2668
2669    def test_release_task_refs(self):
2670        # Issue #29861: task arguments and results should not be kept
2671        # alive after we are done with them.
2672        objs = [CountedObject() for i in range(10)]
2673        refs = [weakref.ref(o) for o in objs]
2674        self.pool.map(identity, objs)
2675
2676        del objs
2677        gc.collect()  # For PyPy or other GCs.
2678        time.sleep(DELTA)  # let threaded cleanup code run
2679        self.assertEqual(set(wr() for wr in refs), {None})
2680        # With a process pool, copies of the objects are returned, check
2681        # they were released too.
2682        self.assertEqual(CountedObject.n_instances, 0)
2683
2684    def test_enter(self):
2685        if self.TYPE == 'manager':
2686            self.skipTest("test not applicable to manager")
2687
2688        pool = self.Pool(1)
2689        with pool:
2690            pass
2691            # call pool.terminate()
2692        # pool is no longer running
2693
2694        with self.assertRaises(ValueError):
2695            # bpo-35477: pool.__enter__() fails if the pool is not running
2696            with pool:
2697                pass
2698        pool.join()
2699
2700    def test_resource_warning(self):
2701        if self.TYPE == 'manager':
2702            self.skipTest("test not applicable to manager")
2703
2704        pool = self.Pool(1)
2705        pool.terminate()
2706        pool.join()
2707
2708        # force state to RUN to emit ResourceWarning in __del__()
2709        pool._state = multiprocessing.pool.RUN
2710
2711        with warnings_helper.check_warnings(
2712                ('unclosed running multiprocessing pool', ResourceWarning)):
2713            pool = None
2714            support.gc_collect()
2715
2716def raising():
2717    raise KeyError("key")
2718
2719def unpickleable_result():
2720    return lambda: 42
2721
2722class _TestPoolWorkerErrors(BaseTestCase):
2723    ALLOWED_TYPES = ('processes', )
2724
2725    def test_async_error_callback(self):
2726        p = multiprocessing.Pool(2)
2727
2728        scratchpad = [None]
2729        def errback(exc):
2730            scratchpad[0] = exc
2731
2732        res = p.apply_async(raising, error_callback=errback)
2733        self.assertRaises(KeyError, res.get)
2734        self.assertTrue(scratchpad[0])
2735        self.assertIsInstance(scratchpad[0], KeyError)
2736
2737        p.close()
2738        p.join()
2739
2740    def test_unpickleable_result(self):
2741        from multiprocessing.pool import MaybeEncodingError
2742        p = multiprocessing.Pool(2)
2743
2744        # Make sure we don't lose pool processes because of encoding errors.
2745        for iteration in range(20):
2746
2747            scratchpad = [None]
2748            def errback(exc):
2749                scratchpad[0] = exc
2750
2751            res = p.apply_async(unpickleable_result, error_callback=errback)
2752            self.assertRaises(MaybeEncodingError, res.get)
2753            wrapped = scratchpad[0]
2754            self.assertTrue(wrapped)
2755            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2756            self.assertIsNotNone(wrapped.exc)
2757            self.assertIsNotNone(wrapped.value)
2758
2759        p.close()
2760        p.join()
2761
2762class _TestPoolWorkerLifetime(BaseTestCase):
2763    ALLOWED_TYPES = ('processes', )
2764
2765    def test_pool_worker_lifetime(self):
2766        p = multiprocessing.Pool(3, maxtasksperchild=10)
2767        self.assertEqual(3, len(p._pool))
2768        origworkerpids = [w.pid for w in p._pool]
2769        # Run many tasks so each worker gets replaced (hopefully)
2770        results = []
2771        for i in range(100):
2772            results.append(p.apply_async(sqr, (i, )))
2773        # Fetch the results and verify we got the right answers,
2774        # also ensuring all the tasks have completed.
2775        for (j, res) in enumerate(results):
2776            self.assertEqual(res.get(), sqr(j))
2777        # Refill the pool
2778        p._repopulate_pool()
2779        # Wait until all workers are alive
2780        # (countdown * DELTA = 5 seconds max startup process time)
2781        countdown = 50
2782        while countdown and not all(w.is_alive() for w in p._pool):
2783            countdown -= 1
2784            time.sleep(DELTA)
2785        finalworkerpids = [w.pid for w in p._pool]
2786        # All pids should be assigned.  See issue #7805.
2787        self.assertNotIn(None, origworkerpids)
2788        self.assertNotIn(None, finalworkerpids)
2789        # Finally, check that the worker pids have changed
2790        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2791        p.close()
2792        p.join()
2793
2794    def test_pool_worker_lifetime_early_close(self):
2795        # Issue #10332: closing a pool whose workers have limited lifetimes
2796        # before all the tasks completed would make join() hang.
2797        p = multiprocessing.Pool(3, maxtasksperchild=1)
2798        results = []
2799        for i in range(6):
2800            results.append(p.apply_async(sqr, (i, 0.3)))
2801        p.close()
2802        p.join()
2803        # check the results
2804        for (j, res) in enumerate(results):
2805            self.assertEqual(res.get(), sqr(j))
2806
2807    def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
2808        # tests cases against bpo-38744 and bpo-39360
2809        cmd = '''if 1:
2810            from multiprocessing import Pool
2811            problem = None
2812            class A:
2813                def __init__(self):
2814                    self.pool = Pool(processes=1)
2815            def test():
2816                global problem
2817                problem = A()
2818                problem.pool.map(float, tuple(range(10)))
2819            if __name__ == "__main__":
2820                test()
2821        '''
2822        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
2823        self.assertEqual(rc, 0)
2824
2825#
2826# Test of creating a customized manager class
2827#
2828
2829from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2830
2831class FooBar(object):
2832    def f(self):
2833        return 'f()'
2834    def g(self):
2835        raise ValueError
2836    def _h(self):
2837        return '_h()'
2838
2839def baz():
2840    for i in range(10):
2841        yield i*i
2842
2843class IteratorProxy(BaseProxy):
2844    _exposed_ = ('__next__',)
2845    def __iter__(self):
2846        return self
2847    def __next__(self):
2848        return self._callmethod('__next__')
2849
2850class MyManager(BaseManager):
2851    pass
2852
2853MyManager.register('Foo', callable=FooBar)
2854MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2855MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2856
2857
2858class _TestMyManager(BaseTestCase):
2859
2860    ALLOWED_TYPES = ('manager',)
2861
2862    def test_mymanager(self):
2863        manager = MyManager()
2864        manager.start()
2865        self.common(manager)
2866        manager.shutdown()
2867
2868        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2869        # to the manager process if it takes longer than 1 second to stop,
2870        # which happens on slow buildbots.
2871        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2872
2873    def test_mymanager_context(self):
2874        with MyManager() as manager:
2875            self.common(manager)
2876        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2877        # to the manager process if it takes longer than 1 second to stop,
2878        # which happens on slow buildbots.
2879        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2880
2881    def test_mymanager_context_prestarted(self):
2882        manager = MyManager()
2883        manager.start()
2884        with manager:
2885            self.common(manager)
2886        self.assertEqual(manager._process.exitcode, 0)
2887
2888    def common(self, manager):
2889        foo = manager.Foo()
2890        bar = manager.Bar()
2891        baz = manager.baz()
2892
2893        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2894        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2895
2896        self.assertEqual(foo_methods, ['f', 'g'])
2897        self.assertEqual(bar_methods, ['f', '_h'])
2898
2899        self.assertEqual(foo.f(), 'f()')
2900        self.assertRaises(ValueError, foo.g)
2901        self.assertEqual(foo._callmethod('f'), 'f()')
2902        self.assertRaises(RemoteError, foo._callmethod, '_h')
2903
2904        self.assertEqual(bar.f(), 'f()')
2905        self.assertEqual(bar._h(), '_h()')
2906        self.assertEqual(bar._callmethod('f'), 'f()')
2907        self.assertEqual(bar._callmethod('_h'), '_h()')
2908
2909        self.assertEqual(list(baz), [i*i for i in range(10)])
2910
2911
2912#
2913# Test of connecting to a remote server and using xmlrpclib for serialization
2914#
2915
2916_queue = pyqueue.Queue()
2917def get_queue():
2918    return _queue
2919
2920class QueueManager(BaseManager):
2921    '''manager class used by server process'''
2922QueueManager.register('get_queue', callable=get_queue)
2923
2924class QueueManager2(BaseManager):
2925    '''manager class which specifies the same interface as QueueManager'''
2926QueueManager2.register('get_queue')
2927
2928
2929SERIALIZER = 'xmlrpclib'
2930
2931class _TestRemoteManager(BaseTestCase):
2932
2933    ALLOWED_TYPES = ('manager',)
2934    values = ['hello world', None, True, 2.25,
2935              'hall\xe5 v\xe4rlden',
2936              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2937              b'hall\xe5 v\xe4rlden',
2938             ]
2939    result = values[:]
2940
2941    @classmethod
2942    def _putter(cls, address, authkey):
2943        manager = QueueManager2(
2944            address=address, authkey=authkey, serializer=SERIALIZER
2945            )
2946        manager.connect()
2947        queue = manager.get_queue()
2948        # Note that xmlrpclib will deserialize object as a list not a tuple
2949        queue.put(tuple(cls.values))
2950
2951    def test_remote(self):
2952        authkey = os.urandom(32)
2953
2954        manager = QueueManager(
2955            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
2956            )
2957        manager.start()
2958        self.addCleanup(manager.shutdown)
2959
2960        p = self.Process(target=self._putter, args=(manager.address, authkey))
2961        p.daemon = True
2962        p.start()
2963
2964        manager2 = QueueManager2(
2965            address=manager.address, authkey=authkey, serializer=SERIALIZER
2966            )
2967        manager2.connect()
2968        queue = manager2.get_queue()
2969
2970        self.assertEqual(queue.get(), self.result)
2971
2972        # Because we are using xmlrpclib for serialization instead of
2973        # pickle this will cause a serialization error.
2974        self.assertRaises(Exception, queue.put, time.sleep)
2975
2976        # Make queue finalizer run before the server is stopped
2977        del queue
2978
2979
2980@hashlib_helper.requires_hashdigest('md5')
2981class _TestManagerRestart(BaseTestCase):
2982
2983    @classmethod
2984    def _putter(cls, address, authkey):
2985        manager = QueueManager(
2986            address=address, authkey=authkey, serializer=SERIALIZER)
2987        manager.connect()
2988        queue = manager.get_queue()
2989        queue.put('hello world')
2990
2991    def test_rapid_restart(self):
2992        authkey = os.urandom(32)
2993        manager = QueueManager(
2994            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2995        try:
2996            srvr = manager.get_server()
2997            addr = srvr.address
2998            # Close the connection.Listener socket which gets opened as a part
2999            # of manager.get_server(). It's not needed for the test.
3000            srvr.listener.close()
3001            manager.start()
3002
3003            p = self.Process(target=self._putter, args=(manager.address, authkey))
3004            p.start()
3005            p.join()
3006            queue = manager.get_queue()
3007            self.assertEqual(queue.get(), 'hello world')
3008            del queue
3009        finally:
3010            if hasattr(manager, "shutdown"):
3011                manager.shutdown()
3012
3013        manager = QueueManager(
3014            address=addr, authkey=authkey, serializer=SERIALIZER)
3015        try:
3016            manager.start()
3017            self.addCleanup(manager.shutdown)
3018        except OSError as e:
3019            if e.errno != errno.EADDRINUSE:
3020                raise
3021            # Retry after some time, in case the old socket was lingering
3022            # (sporadic failure on buildbots)
3023            time.sleep(1.0)
3024            manager = QueueManager(
3025                address=addr, authkey=authkey, serializer=SERIALIZER)
3026            if hasattr(manager, "shutdown"):
3027                self.addCleanup(manager.shutdown)
3028
3029#
3030#
3031#
3032
3033SENTINEL = latin('')
3034
3035class _TestConnection(BaseTestCase):
3036
3037    ALLOWED_TYPES = ('processes', 'threads')
3038
3039    @classmethod
3040    def _echo(cls, conn):
3041        for msg in iter(conn.recv_bytes, SENTINEL):
3042            conn.send_bytes(msg)
3043        conn.close()
3044
3045    def test_connection(self):
3046        conn, child_conn = self.Pipe()
3047
3048        p = self.Process(target=self._echo, args=(child_conn,))
3049        p.daemon = True
3050        p.start()
3051
3052        seq = [1, 2.25, None]
3053        msg = latin('hello world')
3054        longmsg = msg * 10
3055        arr = array.array('i', list(range(4)))
3056
3057        if self.TYPE == 'processes':
3058            self.assertEqual(type(conn.fileno()), int)
3059
3060        self.assertEqual(conn.send(seq), None)
3061        self.assertEqual(conn.recv(), seq)
3062
3063        self.assertEqual(conn.send_bytes(msg), None)
3064        self.assertEqual(conn.recv_bytes(), msg)
3065
3066        if self.TYPE == 'processes':
3067            buffer = array.array('i', [0]*10)
3068            expected = list(arr) + [0] * (10 - len(arr))
3069            self.assertEqual(conn.send_bytes(arr), None)
3070            self.assertEqual(conn.recv_bytes_into(buffer),
3071                             len(arr) * buffer.itemsize)
3072            self.assertEqual(list(buffer), expected)
3073
3074            buffer = array.array('i', [0]*10)
3075            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3076            self.assertEqual(conn.send_bytes(arr), None)
3077            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3078                             len(arr) * buffer.itemsize)
3079            self.assertEqual(list(buffer), expected)
3080
3081            buffer = bytearray(latin(' ' * 40))
3082            self.assertEqual(conn.send_bytes(longmsg), None)
3083            try:
3084                res = conn.recv_bytes_into(buffer)
3085            except multiprocessing.BufferTooShort as e:
3086                self.assertEqual(e.args, (longmsg,))
3087            else:
3088                self.fail('expected BufferTooShort, got %s' % res)
3089
3090        poll = TimingWrapper(conn.poll)
3091
3092        self.assertEqual(poll(), False)
3093        self.assertTimingAlmostEqual(poll.elapsed, 0)
3094
3095        self.assertEqual(poll(-1), False)
3096        self.assertTimingAlmostEqual(poll.elapsed, 0)
3097
3098        self.assertEqual(poll(TIMEOUT1), False)
3099        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3100
3101        conn.send(None)
3102        time.sleep(.1)
3103
3104        self.assertEqual(poll(TIMEOUT1), True)
3105        self.assertTimingAlmostEqual(poll.elapsed, 0)
3106
3107        self.assertEqual(conn.recv(), None)
3108
3109        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
3110        conn.send_bytes(really_big_msg)
3111        self.assertEqual(conn.recv_bytes(), really_big_msg)
3112
3113        conn.send_bytes(SENTINEL)                          # tell child to quit
3114        child_conn.close()
3115
3116        if self.TYPE == 'processes':
3117            self.assertEqual(conn.readable, True)
3118            self.assertEqual(conn.writable, True)
3119            self.assertRaises(EOFError, conn.recv)
3120            self.assertRaises(EOFError, conn.recv_bytes)
3121
3122        p.join()
3123
3124    def test_duplex_false(self):
3125        reader, writer = self.Pipe(duplex=False)
3126        self.assertEqual(writer.send(1), None)
3127        self.assertEqual(reader.recv(), 1)
3128        if self.TYPE == 'processes':
3129            self.assertEqual(reader.readable, True)
3130            self.assertEqual(reader.writable, False)
3131            self.assertEqual(writer.readable, False)
3132            self.assertEqual(writer.writable, True)
3133            self.assertRaises(OSError, reader.send, 2)
3134            self.assertRaises(OSError, writer.recv)
3135            self.assertRaises(OSError, writer.poll)
3136
3137    def test_spawn_close(self):
3138        # We test that a pipe connection can be closed by parent
3139        # process immediately after child is spawned.  On Windows this
3140        # would have sometimes failed on old versions because
3141        # child_conn would be closed before the child got a chance to
3142        # duplicate it.
3143        conn, child_conn = self.Pipe()
3144
3145        p = self.Process(target=self._echo, args=(child_conn,))
3146        p.daemon = True
3147        p.start()
3148        child_conn.close()    # this might complete before child initializes
3149
3150        msg = latin('hello')
3151        conn.send_bytes(msg)
3152        self.assertEqual(conn.recv_bytes(), msg)
3153
3154        conn.send_bytes(SENTINEL)
3155        conn.close()
3156        p.join()
3157
3158    def test_sendbytes(self):
3159        if self.TYPE != 'processes':
3160            self.skipTest('test not appropriate for {}'.format(self.TYPE))
3161
3162        msg = latin('abcdefghijklmnopqrstuvwxyz')
3163        a, b = self.Pipe()
3164
3165        a.send_bytes(msg)
3166        self.assertEqual(b.recv_bytes(), msg)
3167
3168        a.send_bytes(msg, 5)
3169        self.assertEqual(b.recv_bytes(), msg[5:])
3170
3171        a.send_bytes(msg, 7, 8)
3172        self.assertEqual(b.recv_bytes(), msg[7:7+8])
3173
3174        a.send_bytes(msg, 26)
3175        self.assertEqual(b.recv_bytes(), latin(''))
3176
3177        a.send_bytes(msg, 26, 0)
3178        self.assertEqual(b.recv_bytes(), latin(''))
3179
3180        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3181
3182        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3183
3184        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3185
3186        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3187
3188        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3189
3190    @classmethod
3191    def _is_fd_assigned(cls, fd):
3192        try:
3193            os.fstat(fd)
3194        except OSError as e:
3195            if e.errno == errno.EBADF:
3196                return False
3197            raise
3198        else:
3199            return True
3200
3201    @classmethod
3202    def _writefd(cls, conn, data, create_dummy_fds=False):
3203        if create_dummy_fds:
3204            for i in range(0, 256):
3205                if not cls._is_fd_assigned(i):
3206                    os.dup2(conn.fileno(), i)
3207        fd = reduction.recv_handle(conn)
3208        if msvcrt:
3209            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3210        os.write(fd, data)
3211        os.close(fd)
3212
3213    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3214    def test_fd_transfer(self):
3215        if self.TYPE != 'processes':
3216            self.skipTest("only makes sense with processes")
3217        conn, child_conn = self.Pipe(duplex=True)
3218
3219        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3220        p.daemon = True
3221        p.start()
3222        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3223        with open(os_helper.TESTFN, "wb") as f:
3224            fd = f.fileno()
3225            if msvcrt:
3226                fd = msvcrt.get_osfhandle(fd)
3227            reduction.send_handle(conn, fd, p.pid)
3228        p.join()
3229        with open(os_helper.TESTFN, "rb") as f:
3230            self.assertEqual(f.read(), b"foo")
3231
3232    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3233    @unittest.skipIf(sys.platform == "win32",
3234                     "test semantics don't make sense on Windows")
3235    @unittest.skipIf(MAXFD <= 256,
3236                     "largest assignable fd number is too small")
3237    @unittest.skipUnless(hasattr(os, "dup2"),
3238                         "test needs os.dup2()")
3239    def test_large_fd_transfer(self):
3240        # With fd > 256 (issue #11657)
3241        if self.TYPE != 'processes':
3242            self.skipTest("only makes sense with processes")
3243        conn, child_conn = self.Pipe(duplex=True)
3244
3245        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3246        p.daemon = True
3247        p.start()
3248        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3249        with open(os_helper.TESTFN, "wb") as f:
3250            fd = f.fileno()
3251            for newfd in range(256, MAXFD):
3252                if not self._is_fd_assigned(newfd):
3253                    break
3254            else:
3255                self.fail("could not find an unassigned large file descriptor")
3256            os.dup2(fd, newfd)
3257            try:
3258                reduction.send_handle(conn, newfd, p.pid)
3259            finally:
3260                os.close(newfd)
3261        p.join()
3262        with open(os_helper.TESTFN, "rb") as f:
3263            self.assertEqual(f.read(), b"bar")
3264
3265    @classmethod
3266    def _send_data_without_fd(self, conn):
3267        os.write(conn.fileno(), b"\0")
3268
3269    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3270    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3271    def test_missing_fd_transfer(self):
3272        # Check that exception is raised when received data is not
3273        # accompanied by a file descriptor in ancillary data.
3274        if self.TYPE != 'processes':
3275            self.skipTest("only makes sense with processes")
3276        conn, child_conn = self.Pipe(duplex=True)
3277
3278        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3279        p.daemon = True
3280        p.start()
3281        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3282        p.join()
3283
3284    def test_context(self):
3285        a, b = self.Pipe()
3286
3287        with a, b:
3288            a.send(1729)
3289            self.assertEqual(b.recv(), 1729)
3290            if self.TYPE == 'processes':
3291                self.assertFalse(a.closed)
3292                self.assertFalse(b.closed)
3293
3294        if self.TYPE == 'processes':
3295            self.assertTrue(a.closed)
3296            self.assertTrue(b.closed)
3297            self.assertRaises(OSError, a.recv)
3298            self.assertRaises(OSError, b.recv)
3299
3300class _TestListener(BaseTestCase):
3301
3302    ALLOWED_TYPES = ('processes',)
3303
3304    def test_multiple_bind(self):
3305        for family in self.connection.families:
3306            l = self.connection.Listener(family=family)
3307            self.addCleanup(l.close)
3308            self.assertRaises(OSError, self.connection.Listener,
3309                              l.address, family)
3310
3311    def test_context(self):
3312        with self.connection.Listener() as l:
3313            with self.connection.Client(l.address) as c:
3314                with l.accept() as d:
3315                    c.send(1729)
3316                    self.assertEqual(d.recv(), 1729)
3317
3318        if self.TYPE == 'processes':
3319            self.assertRaises(OSError, l.accept)
3320
3321    @unittest.skipUnless(util.abstract_sockets_supported,
3322                         "test needs abstract socket support")
3323    def test_abstract_socket(self):
3324        with self.connection.Listener("\0something") as listener:
3325            with self.connection.Client(listener.address) as client:
3326                with listener.accept() as d:
3327                    client.send(1729)
3328                    self.assertEqual(d.recv(), 1729)
3329
3330        if self.TYPE == 'processes':
3331            self.assertRaises(OSError, listener.accept)
3332
3333
3334class _TestListenerClient(BaseTestCase):
3335
3336    ALLOWED_TYPES = ('processes', 'threads')
3337
3338    @classmethod
3339    def _test(cls, address):
3340        conn = cls.connection.Client(address)
3341        conn.send('hello')
3342        conn.close()
3343
3344    def test_listener_client(self):
3345        for family in self.connection.families:
3346            l = self.connection.Listener(family=family)
3347            p = self.Process(target=self._test, args=(l.address,))
3348            p.daemon = True
3349            p.start()
3350            conn = l.accept()
3351            self.assertEqual(conn.recv(), 'hello')
3352            p.join()
3353            l.close()
3354
3355    def test_issue14725(self):
3356        l = self.connection.Listener()
3357        p = self.Process(target=self._test, args=(l.address,))
3358        p.daemon = True
3359        p.start()
3360        time.sleep(1)
3361        # On Windows the client process should by now have connected,
3362        # written data and closed the pipe handle by now.  This causes
3363        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3364        # 14725.
3365        conn = l.accept()
3366        self.assertEqual(conn.recv(), 'hello')
3367        conn.close()
3368        p.join()
3369        l.close()
3370
3371    def test_issue16955(self):
3372        for fam in self.connection.families:
3373            l = self.connection.Listener(family=fam)
3374            c = self.connection.Client(l.address)
3375            a = l.accept()
3376            a.send_bytes(b"hello")
3377            self.assertTrue(c.poll(1))
3378            a.close()
3379            c.close()
3380            l.close()
3381
3382class _TestPoll(BaseTestCase):
3383
3384    ALLOWED_TYPES = ('processes', 'threads')
3385
3386    def test_empty_string(self):
3387        a, b = self.Pipe()
3388        self.assertEqual(a.poll(), False)
3389        b.send_bytes(b'')
3390        self.assertEqual(a.poll(), True)
3391        self.assertEqual(a.poll(), True)
3392
3393    @classmethod
3394    def _child_strings(cls, conn, strings):
3395        for s in strings:
3396            time.sleep(0.1)
3397            conn.send_bytes(s)
3398        conn.close()
3399
3400    def test_strings(self):
3401        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3402        a, b = self.Pipe()
3403        p = self.Process(target=self._child_strings, args=(b, strings))
3404        p.start()
3405
3406        for s in strings:
3407            for i in range(200):
3408                if a.poll(0.01):
3409                    break
3410            x = a.recv_bytes()
3411            self.assertEqual(s, x)
3412
3413        p.join()
3414
3415    @classmethod
3416    def _child_boundaries(cls, r):
3417        # Polling may "pull" a message in to the child process, but we
3418        # don't want it to pull only part of a message, as that would
3419        # corrupt the pipe for any other processes which might later
3420        # read from it.
3421        r.poll(5)
3422
3423    def test_boundaries(self):
3424        r, w = self.Pipe(False)
3425        p = self.Process(target=self._child_boundaries, args=(r,))
3426        p.start()
3427        time.sleep(2)
3428        L = [b"first", b"second"]
3429        for obj in L:
3430            w.send_bytes(obj)
3431        w.close()
3432        p.join()
3433        self.assertIn(r.recv_bytes(), L)
3434
3435    @classmethod
3436    def _child_dont_merge(cls, b):
3437        b.send_bytes(b'a')
3438        b.send_bytes(b'b')
3439        b.send_bytes(b'cd')
3440
3441    def test_dont_merge(self):
3442        a, b = self.Pipe()
3443        self.assertEqual(a.poll(0.0), False)
3444        self.assertEqual(a.poll(0.1), False)
3445
3446        p = self.Process(target=self._child_dont_merge, args=(b,))
3447        p.start()
3448
3449        self.assertEqual(a.recv_bytes(), b'a')
3450        self.assertEqual(a.poll(1.0), True)
3451        self.assertEqual(a.poll(1.0), True)
3452        self.assertEqual(a.recv_bytes(), b'b')
3453        self.assertEqual(a.poll(1.0), True)
3454        self.assertEqual(a.poll(1.0), True)
3455        self.assertEqual(a.poll(0.0), True)
3456        self.assertEqual(a.recv_bytes(), b'cd')
3457
3458        p.join()
3459
3460#
3461# Test of sending connection and socket objects between processes
3462#
3463
3464@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3465@hashlib_helper.requires_hashdigest('md5')
3466class _TestPicklingConnections(BaseTestCase):
3467
3468    ALLOWED_TYPES = ('processes',)
3469
3470    @classmethod
3471    def tearDownClass(cls):
3472        from multiprocessing import resource_sharer
3473        resource_sharer.stop(timeout=support.LONG_TIMEOUT)
3474
3475    @classmethod
3476    def _listener(cls, conn, families):
3477        for fam in families:
3478            l = cls.connection.Listener(family=fam)
3479            conn.send(l.address)
3480            new_conn = l.accept()
3481            conn.send(new_conn)
3482            new_conn.close()
3483            l.close()
3484
3485        l = socket.create_server((socket_helper.HOST, 0))
3486        conn.send(l.getsockname())
3487        new_conn, addr = l.accept()
3488        conn.send(new_conn)
3489        new_conn.close()
3490        l.close()
3491
3492        conn.recv()
3493
3494    @classmethod
3495    def _remote(cls, conn):
3496        for (address, msg) in iter(conn.recv, None):
3497            client = cls.connection.Client(address)
3498            client.send(msg.upper())
3499            client.close()
3500
3501        address, msg = conn.recv()
3502        client = socket.socket()
3503        client.connect(address)
3504        client.sendall(msg.upper())
3505        client.close()
3506
3507        conn.close()
3508
3509    def test_pickling(self):
3510        families = self.connection.families
3511
3512        lconn, lconn0 = self.Pipe()
3513        lp = self.Process(target=self._listener, args=(lconn0, families))
3514        lp.daemon = True
3515        lp.start()
3516        lconn0.close()
3517
3518        rconn, rconn0 = self.Pipe()
3519        rp = self.Process(target=self._remote, args=(rconn0,))
3520        rp.daemon = True
3521        rp.start()
3522        rconn0.close()
3523
3524        for fam in families:
3525            msg = ('This connection uses family %s' % fam).encode('ascii')
3526            address = lconn.recv()
3527            rconn.send((address, msg))
3528            new_conn = lconn.recv()
3529            self.assertEqual(new_conn.recv(), msg.upper())
3530
3531        rconn.send(None)
3532
3533        msg = latin('This connection uses a normal socket')
3534        address = lconn.recv()
3535        rconn.send((address, msg))
3536        new_conn = lconn.recv()
3537        buf = []
3538        while True:
3539            s = new_conn.recv(100)
3540            if not s:
3541                break
3542            buf.append(s)
3543        buf = b''.join(buf)
3544        self.assertEqual(buf, msg.upper())
3545        new_conn.close()
3546
3547        lconn.send(None)
3548
3549        rconn.close()
3550        lconn.close()
3551
3552        lp.join()
3553        rp.join()
3554
3555    @classmethod
3556    def child_access(cls, conn):
3557        w = conn.recv()
3558        w.send('all is well')
3559        w.close()
3560
3561        r = conn.recv()
3562        msg = r.recv()
3563        conn.send(msg*2)
3564
3565        conn.close()
3566
3567    def test_access(self):
3568        # On Windows, if we do not specify a destination pid when
3569        # using DupHandle then we need to be careful to use the
3570        # correct access flags for DuplicateHandle(), or else
3571        # DupHandle.detach() will raise PermissionError.  For example,
3572        # for a read only pipe handle we should use
3573        # access=FILE_GENERIC_READ.  (Unfortunately
3574        # DUPLICATE_SAME_ACCESS does not work.)
3575        conn, child_conn = self.Pipe()
3576        p = self.Process(target=self.child_access, args=(child_conn,))
3577        p.daemon = True
3578        p.start()
3579        child_conn.close()
3580
3581        r, w = self.Pipe(duplex=False)
3582        conn.send(w)
3583        w.close()
3584        self.assertEqual(r.recv(), 'all is well')
3585        r.close()
3586
3587        r, w = self.Pipe(duplex=False)
3588        conn.send(r)
3589        r.close()
3590        w.send('foobar')
3591        w.close()
3592        self.assertEqual(conn.recv(), 'foobar'*2)
3593
3594        p.join()
3595
3596#
3597#
3598#
3599
3600class _TestHeap(BaseTestCase):
3601
3602    ALLOWED_TYPES = ('processes',)
3603
3604    def setUp(self):
3605        super().setUp()
3606        # Make pristine heap for these tests
3607        self.old_heap = multiprocessing.heap.BufferWrapper._heap
3608        multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3609
3610    def tearDown(self):
3611        multiprocessing.heap.BufferWrapper._heap = self.old_heap
3612        super().tearDown()
3613
3614    def test_heap(self):
3615        iterations = 5000
3616        maxblocks = 50
3617        blocks = []
3618
3619        # get the heap object
3620        heap = multiprocessing.heap.BufferWrapper._heap
3621        heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3622
3623        # create and destroy lots of blocks of different sizes
3624        for i in range(iterations):
3625            size = int(random.lognormvariate(0, 1) * 1000)
3626            b = multiprocessing.heap.BufferWrapper(size)
3627            blocks.append(b)
3628            if len(blocks) > maxblocks:
3629                i = random.randrange(maxblocks)
3630                del blocks[i]
3631            del b
3632
3633        # verify the state of the heap
3634        with heap._lock:
3635            all = []
3636            free = 0
3637            occupied = 0
3638            for L in list(heap._len_to_seq.values()):
3639                # count all free blocks in arenas
3640                for arena, start, stop in L:
3641                    all.append((heap._arenas.index(arena), start, stop,
3642                                stop-start, 'free'))
3643                    free += (stop-start)
3644            for arena, arena_blocks in heap._allocated_blocks.items():
3645                # count all allocated blocks in arenas
3646                for start, stop in arena_blocks:
3647                    all.append((heap._arenas.index(arena), start, stop,
3648                                stop-start, 'occupied'))
3649                    occupied += (stop-start)
3650
3651            self.assertEqual(free + occupied,
3652                             sum(arena.size for arena in heap._arenas))
3653
3654            all.sort()
3655
3656            for i in range(len(all)-1):
3657                (arena, start, stop) = all[i][:3]
3658                (narena, nstart, nstop) = all[i+1][:3]
3659                if arena != narena:
3660                    # Two different arenas
3661                    self.assertEqual(stop, heap._arenas[arena].size)  # last block
3662                    self.assertEqual(nstart, 0)         # first block
3663                else:
3664                    # Same arena: two adjacent blocks
3665                    self.assertEqual(stop, nstart)
3666
3667        # test free'ing all blocks
3668        random.shuffle(blocks)
3669        while blocks:
3670            blocks.pop()
3671
3672        self.assertEqual(heap._n_frees, heap._n_mallocs)
3673        self.assertEqual(len(heap._pending_free_blocks), 0)
3674        self.assertEqual(len(heap._arenas), 0)
3675        self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3676        self.assertEqual(len(heap._len_to_seq), 0)
3677
3678    def test_free_from_gc(self):
3679        # Check that freeing of blocks by the garbage collector doesn't deadlock
3680        # (issue #12352).
3681        # Make sure the GC is enabled, and set lower collection thresholds to
3682        # make collections more frequent (and increase the probability of
3683        # deadlock).
3684        if not gc.isenabled():
3685            gc.enable()
3686            self.addCleanup(gc.disable)
3687        thresholds = gc.get_threshold()
3688        self.addCleanup(gc.set_threshold, *thresholds)
3689        gc.set_threshold(10)
3690
3691        # perform numerous block allocations, with cyclic references to make
3692        # sure objects are collected asynchronously by the gc
3693        for i in range(5000):
3694            a = multiprocessing.heap.BufferWrapper(1)
3695            b = multiprocessing.heap.BufferWrapper(1)
3696            # circular references
3697            a.buddy = b
3698            b.buddy = a
3699
3700#
3701#
3702#
3703
3704class _Foo(Structure):
3705    _fields_ = [
3706        ('x', c_int),
3707        ('y', c_double),
3708        ('z', c_longlong,)
3709        ]
3710
3711class _TestSharedCTypes(BaseTestCase):
3712
3713    ALLOWED_TYPES = ('processes',)
3714
3715    def setUp(self):
3716        if not HAS_SHAREDCTYPES:
3717            self.skipTest("requires multiprocessing.sharedctypes")
3718
3719    @classmethod
3720    def _double(cls, x, y, z, foo, arr, string):
3721        x.value *= 2
3722        y.value *= 2
3723        z.value *= 2
3724        foo.x *= 2
3725        foo.y *= 2
3726        string.value *= 2
3727        for i in range(len(arr)):
3728            arr[i] *= 2
3729
3730    def test_sharedctypes(self, lock=False):
3731        x = Value('i', 7, lock=lock)
3732        y = Value(c_double, 1.0/3.0, lock=lock)
3733        z = Value(c_longlong, 2 ** 33, lock=lock)
3734        foo = Value(_Foo, 3, 2, lock=lock)
3735        arr = self.Array('d', list(range(10)), lock=lock)
3736        string = self.Array('c', 20, lock=lock)
3737        string.value = latin('hello')
3738
3739        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
3740        p.daemon = True
3741        p.start()
3742        p.join()
3743
3744        self.assertEqual(x.value, 14)
3745        self.assertAlmostEqual(y.value, 2.0/3.0)
3746        self.assertEqual(z.value, 2 ** 34)
3747        self.assertEqual(foo.x, 6)
3748        self.assertAlmostEqual(foo.y, 4.0)
3749        for i in range(10):
3750            self.assertAlmostEqual(arr[i], i*2)
3751        self.assertEqual(string.value, latin('hellohello'))
3752
3753    def test_synchronize(self):
3754        self.test_sharedctypes(lock=True)
3755
3756    def test_copy(self):
3757        foo = _Foo(2, 5.0, 2 ** 33)
3758        bar = copy(foo)
3759        foo.x = 0
3760        foo.y = 0
3761        foo.z = 0
3762        self.assertEqual(bar.x, 2)
3763        self.assertAlmostEqual(bar.y, 5.0)
3764        self.assertEqual(bar.z, 2 ** 33)
3765
3766
3767@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3768@hashlib_helper.requires_hashdigest('md5')
3769class _TestSharedMemory(BaseTestCase):
3770
3771    ALLOWED_TYPES = ('processes',)
3772
3773    @staticmethod
3774    def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3775        if isinstance(shmem_name_or_obj, str):
3776            local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3777        else:
3778            local_sms = shmem_name_or_obj
3779        local_sms.buf[:len(binary_data)] = binary_data
3780        local_sms.close()
3781
3782    def _new_shm_name(self, prefix):
3783        # Add a PID to the name of a POSIX shared memory object to allow
3784        # running multiprocessing tests (test_multiprocessing_fork,
3785        # test_multiprocessing_spawn, etc) in parallel.
3786        return prefix + str(os.getpid())
3787
3788    def test_shared_memory_basics(self):
3789        name_tsmb = self._new_shm_name('test01_tsmb')
3790        sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512)
3791        self.addCleanup(sms.unlink)
3792
3793        # Verify attributes are readable.
3794        self.assertEqual(sms.name, name_tsmb)
3795        self.assertGreaterEqual(sms.size, 512)
3796        self.assertGreaterEqual(len(sms.buf), sms.size)
3797
3798        # Verify __repr__
3799        self.assertIn(sms.name, str(sms))
3800        self.assertIn(str(sms.size), str(sms))
3801
3802        # Modify contents of shared memory segment through memoryview.
3803        sms.buf[0] = 42
3804        self.assertEqual(sms.buf[0], 42)
3805
3806        # Attach to existing shared memory segment.
3807        also_sms = shared_memory.SharedMemory(name_tsmb)
3808        self.assertEqual(also_sms.buf[0], 42)
3809        also_sms.close()
3810
3811        # Attach to existing shared memory segment but specify a new size.
3812        same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size)
3813        self.assertLess(same_sms.size, 20*sms.size)  # Size was ignored.
3814        same_sms.close()
3815
3816        # Creating Shared Memory Segment with -ve size
3817        with self.assertRaises(ValueError):
3818            shared_memory.SharedMemory(create=True, size=-2)
3819
3820        # Attaching Shared Memory Segment without a name
3821        with self.assertRaises(ValueError):
3822            shared_memory.SharedMemory(create=False)
3823
3824        # Test if shared memory segment is created properly,
3825        # when _make_filename returns an existing shared memory segment name
3826        with unittest.mock.patch(
3827            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
3828
3829            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
3830            names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')]
3831            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
3832            # because some POSIX compliant systems require name to start with /
3833            names = [NAME_PREFIX + name for name in names]
3834
3835            mock_make_filename.side_effect = names
3836            shm1 = shared_memory.SharedMemory(create=True, size=1)
3837            self.addCleanup(shm1.unlink)
3838            self.assertEqual(shm1._name, names[0])
3839
3840            mock_make_filename.side_effect = names
3841            shm2 = shared_memory.SharedMemory(create=True, size=1)
3842            self.addCleanup(shm2.unlink)
3843            self.assertEqual(shm2._name, names[1])
3844
3845        if shared_memory._USE_POSIX:
3846            # Posix Shared Memory can only be unlinked once.  Here we
3847            # test an implementation detail that is not observed across
3848            # all supported platforms (since WindowsNamedSharedMemory
3849            # manages unlinking on its own and unlink() does nothing).
3850            # True release of shared memory segment does not necessarily
3851            # happen until process exits, depending on the OS platform.
3852            name_dblunlink = self._new_shm_name('test01_dblunlink')
3853            sms_uno = shared_memory.SharedMemory(
3854                name_dblunlink,
3855                create=True,
3856                size=5000
3857            )
3858            with self.assertRaises(FileNotFoundError):
3859                try:
3860                    self.assertGreaterEqual(sms_uno.size, 5000)
3861
3862                    sms_duo = shared_memory.SharedMemory(name_dblunlink)
3863                    sms_duo.unlink()  # First shm_unlink() call.
3864                    sms_duo.close()
3865                    sms_uno.close()
3866
3867                finally:
3868                    sms_uno.unlink()  # A second shm_unlink() call is bad.
3869
3870        with self.assertRaises(FileExistsError):
3871            # Attempting to create a new shared memory segment with a
3872            # name that is already in use triggers an exception.
3873            there_can_only_be_one_sms = shared_memory.SharedMemory(
3874                name_tsmb,
3875                create=True,
3876                size=512
3877            )
3878
3879        if shared_memory._USE_POSIX:
3880            # Requesting creation of a shared memory segment with the option
3881            # to attach to an existing segment, if that name is currently in
3882            # use, should not trigger an exception.
3883            # Note:  Using a smaller size could possibly cause truncation of
3884            # the existing segment but is OS platform dependent.  In the
3885            # case of MacOS/darwin, requesting a smaller size is disallowed.
3886            class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3887                _flags = os.O_CREAT | os.O_RDWR
3888            ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb)
3889            self.assertEqual(ok_if_exists_sms.size, sms.size)
3890            ok_if_exists_sms.close()
3891
3892        # Attempting to attach to an existing shared memory segment when
3893        # no segment exists with the supplied name triggers an exception.
3894        with self.assertRaises(FileNotFoundError):
3895            nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3896            nonexisting_sms.unlink()  # Error should occur on prior line.
3897
3898        sms.close()
3899
3900    def test_shared_memory_recreate(self):
3901        # Test if shared memory segment is created properly,
3902        # when _make_filename returns an existing shared memory segment name
3903        with unittest.mock.patch(
3904            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
3905
3906            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
3907            names = ['test01_fn', 'test02_fn']
3908            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
3909            # because some POSIX compliant systems require name to start with /
3910            names = [NAME_PREFIX + name for name in names]
3911
3912            mock_make_filename.side_effect = names
3913            shm1 = shared_memory.SharedMemory(create=True, size=1)
3914            self.addCleanup(shm1.unlink)
3915            self.assertEqual(shm1._name, names[0])
3916
3917            mock_make_filename.side_effect = names
3918            shm2 = shared_memory.SharedMemory(create=True, size=1)
3919            self.addCleanup(shm2.unlink)
3920            self.assertEqual(shm2._name, names[1])
3921
3922    def test_invalid_shared_memory_cration(self):
3923        # Test creating a shared memory segment with negative size
3924        with self.assertRaises(ValueError):
3925            sms_invalid = shared_memory.SharedMemory(create=True, size=-1)
3926
3927        # Test creating a shared memory segment with size 0
3928        with self.assertRaises(ValueError):
3929            sms_invalid = shared_memory.SharedMemory(create=True, size=0)
3930
3931        # Test creating a shared memory segment without size argument
3932        with self.assertRaises(ValueError):
3933            sms_invalid = shared_memory.SharedMemory(create=True)
3934
3935    def test_shared_memory_pickle_unpickle(self):
3936        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
3937            with self.subTest(proto=proto):
3938                sms = shared_memory.SharedMemory(create=True, size=512)
3939                self.addCleanup(sms.unlink)
3940                sms.buf[0:6] = b'pickle'
3941
3942                # Test pickling
3943                pickled_sms = pickle.dumps(sms, protocol=proto)
3944
3945                # Test unpickling
3946                sms2 = pickle.loads(pickled_sms)
3947                self.assertIsInstance(sms2, shared_memory.SharedMemory)
3948                self.assertEqual(sms.name, sms2.name)
3949                self.assertEqual(bytes(sms.buf[0:6]), b'pickle')
3950                self.assertEqual(bytes(sms2.buf[0:6]), b'pickle')
3951
3952                # Test that unpickled version is still the same SharedMemory
3953                sms.buf[0:6] = b'newval'
3954                self.assertEqual(bytes(sms.buf[0:6]), b'newval')
3955                self.assertEqual(bytes(sms2.buf[0:6]), b'newval')
3956
3957                sms2.buf[0:6] = b'oldval'
3958                self.assertEqual(bytes(sms.buf[0:6]), b'oldval')
3959                self.assertEqual(bytes(sms2.buf[0:6]), b'oldval')
3960
3961    def test_shared_memory_pickle_unpickle_dead_object(self):
3962        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
3963            with self.subTest(proto=proto):
3964                sms = shared_memory.SharedMemory(create=True, size=512)
3965                sms.buf[0:6] = b'pickle'
3966                pickled_sms = pickle.dumps(sms, protocol=proto)
3967
3968                # Now, we are going to kill the original object.
3969                # So, unpickled one won't be able to attach to it.
3970                sms.close()
3971                sms.unlink()
3972
3973                with self.assertRaises(FileNotFoundError):
3974                    pickle.loads(pickled_sms)
3975
3976    def test_shared_memory_across_processes(self):
3977        # bpo-40135: don't define shared memory block's name in case of
3978        # the failure when we run multiprocessing tests in parallel.
3979        sms = shared_memory.SharedMemory(create=True, size=512)
3980        self.addCleanup(sms.unlink)
3981
3982        # Verify remote attachment to existing block by name is working.
3983        p = self.Process(
3984            target=self._attach_existing_shmem_then_write,
3985            args=(sms.name, b'howdy')
3986        )
3987        p.daemon = True
3988        p.start()
3989        p.join()
3990        self.assertEqual(bytes(sms.buf[:5]), b'howdy')
3991
3992        # Verify pickling of SharedMemory instance also works.
3993        p = self.Process(
3994            target=self._attach_existing_shmem_then_write,
3995            args=(sms, b'HELLO')
3996        )
3997        p.daemon = True
3998        p.start()
3999        p.join()
4000        self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
4001
4002        sms.close()
4003
4004    @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
4005    def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
4006        # bpo-36368: protect SharedMemoryManager server process from
4007        # KeyboardInterrupt signals.
4008        smm = multiprocessing.managers.SharedMemoryManager()
4009        smm.start()
4010
4011        # make sure the manager works properly at the beginning
4012        sl = smm.ShareableList(range(10))
4013
4014        # the manager's server should ignore KeyboardInterrupt signals, and
4015        # maintain its connection with the current process, and success when
4016        # asked to deliver memory segments.
4017        os.kill(smm._process.pid, signal.SIGINT)
4018
4019        sl2 = smm.ShareableList(range(10))
4020
4021        # test that the custom signal handler registered in the Manager does
4022        # not affect signal handling in the parent process.
4023        with self.assertRaises(KeyboardInterrupt):
4024            os.kill(os.getpid(), signal.SIGINT)
4025
4026        smm.shutdown()
4027
4028    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4029    def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
4030        # bpo-36867: test that a SharedMemoryManager uses the
4031        # same resource_tracker process as its parent.
4032        cmd = '''if 1:
4033            from multiprocessing.managers import SharedMemoryManager
4034
4035
4036            smm = SharedMemoryManager()
4037            smm.start()
4038            sl = smm.ShareableList(range(10))
4039            smm.shutdown()
4040        '''
4041        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
4042
4043        # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
4044        # resource_tracker process as its parent would make the parent's
4045        # tracker complain about sl being leaked even though smm.shutdown()
4046        # properly released sl.
4047        self.assertFalse(err)
4048
4049    def test_shared_memory_SharedMemoryManager_basics(self):
4050        smm1 = multiprocessing.managers.SharedMemoryManager()
4051        with self.assertRaises(ValueError):
4052            smm1.SharedMemory(size=9)  # Fails if SharedMemoryServer not started
4053        smm1.start()
4054        lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
4055        lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
4056        doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
4057        self.assertEqual(len(doppleganger_list0), 5)
4058        doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
4059        self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
4060        held_name = lom[0].name
4061        smm1.shutdown()
4062        if sys.platform != "win32":
4063            # Calls to unlink() have no effect on Windows platform; shared
4064            # memory will only be released once final process exits.
4065            with self.assertRaises(FileNotFoundError):
4066                # No longer there to be attached to again.
4067                absent_shm = shared_memory.SharedMemory(name=held_name)
4068
4069        with multiprocessing.managers.SharedMemoryManager() as smm2:
4070            sl = smm2.ShareableList("howdy")
4071            shm = smm2.SharedMemory(size=128)
4072            held_name = sl.shm.name
4073        if sys.platform != "win32":
4074            with self.assertRaises(FileNotFoundError):
4075                # No longer there to be attached to again.
4076                absent_sl = shared_memory.ShareableList(name=held_name)
4077
4078
4079    def test_shared_memory_ShareableList_basics(self):
4080        sl = shared_memory.ShareableList(
4081            ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
4082        )
4083        self.addCleanup(sl.shm.unlink)
4084
4085        # Verify __repr__
4086        self.assertIn(sl.shm.name, str(sl))
4087        self.assertIn(str(list(sl)), str(sl))
4088
4089        # Index Out of Range (get)
4090        with self.assertRaises(IndexError):
4091            sl[7]
4092
4093        # Index Out of Range (set)
4094        with self.assertRaises(IndexError):
4095            sl[7] = 2
4096
4097        # Assign value without format change (str -> str)
4098        current_format = sl._get_packing_format(0)
4099        sl[0] = 'howdy'
4100        self.assertEqual(current_format, sl._get_packing_format(0))
4101
4102        # Verify attributes are readable.
4103        self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
4104
4105        # Exercise len().
4106        self.assertEqual(len(sl), 7)
4107
4108        # Exercise index().
4109        with warnings.catch_warnings():
4110            # Suppress BytesWarning when comparing against b'HoWdY'.
4111            warnings.simplefilter('ignore')
4112            with self.assertRaises(ValueError):
4113                sl.index('100')
4114            self.assertEqual(sl.index(100), 3)
4115
4116        # Exercise retrieving individual values.
4117        self.assertEqual(sl[0], 'howdy')
4118        self.assertEqual(sl[-2], True)
4119
4120        # Exercise iterability.
4121        self.assertEqual(
4122            tuple(sl),
4123            ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
4124        )
4125
4126        # Exercise modifying individual values.
4127        sl[3] = 42
4128        self.assertEqual(sl[3], 42)
4129        sl[4] = 'some'  # Change type at a given position.
4130        self.assertEqual(sl[4], 'some')
4131        self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
4132        with self.assertRaisesRegex(ValueError,
4133                                    "exceeds available storage"):
4134            sl[4] = 'far too many'
4135        self.assertEqual(sl[4], 'some')
4136        sl[0] = 'encodés'  # Exactly 8 bytes of UTF-8 data
4137        self.assertEqual(sl[0], 'encodés')
4138        self.assertEqual(sl[1], b'HoWdY')  # no spillage
4139        with self.assertRaisesRegex(ValueError,
4140                                    "exceeds available storage"):
4141            sl[0] = 'encodées'  # Exactly 9 bytes of UTF-8 data
4142        self.assertEqual(sl[1], b'HoWdY')
4143        with self.assertRaisesRegex(ValueError,
4144                                    "exceeds available storage"):
4145            sl[1] = b'123456789'
4146        self.assertEqual(sl[1], b'HoWdY')
4147
4148        # Exercise count().
4149        with warnings.catch_warnings():
4150            # Suppress BytesWarning when comparing against b'HoWdY'.
4151            warnings.simplefilter('ignore')
4152            self.assertEqual(sl.count(42), 2)
4153            self.assertEqual(sl.count(b'HoWdY'), 1)
4154            self.assertEqual(sl.count(b'adios'), 0)
4155
4156        # Exercise creating a duplicate.
4157        name_duplicate = self._new_shm_name('test03_duplicate')
4158        sl_copy = shared_memory.ShareableList(sl, name=name_duplicate)
4159        try:
4160            self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
4161            self.assertEqual(name_duplicate, sl_copy.shm.name)
4162            self.assertEqual(list(sl), list(sl_copy))
4163            self.assertEqual(sl.format, sl_copy.format)
4164            sl_copy[-1] = 77
4165            self.assertEqual(sl_copy[-1], 77)
4166            self.assertNotEqual(sl[-1], 77)
4167            sl_copy.shm.close()
4168        finally:
4169            sl_copy.shm.unlink()
4170
4171        # Obtain a second handle on the same ShareableList.
4172        sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
4173        self.assertEqual(sl.shm.name, sl_tethered.shm.name)
4174        sl_tethered[-1] = 880
4175        self.assertEqual(sl[-1], 880)
4176        sl_tethered.shm.close()
4177
4178        sl.shm.close()
4179
4180        # Exercise creating an empty ShareableList.
4181        empty_sl = shared_memory.ShareableList()
4182        try:
4183            self.assertEqual(len(empty_sl), 0)
4184            self.assertEqual(empty_sl.format, '')
4185            self.assertEqual(empty_sl.count('any'), 0)
4186            with self.assertRaises(ValueError):
4187                empty_sl.index(None)
4188            empty_sl.shm.close()
4189        finally:
4190            empty_sl.shm.unlink()
4191
4192    def test_shared_memory_ShareableList_pickling(self):
4193        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4194            with self.subTest(proto=proto):
4195                sl = shared_memory.ShareableList(range(10))
4196                self.addCleanup(sl.shm.unlink)
4197
4198                serialized_sl = pickle.dumps(sl, protocol=proto)
4199                deserialized_sl = pickle.loads(serialized_sl)
4200                self.assertIsInstance(
4201                    deserialized_sl, shared_memory.ShareableList)
4202                self.assertEqual(deserialized_sl[-1], 9)
4203                self.assertIsNot(sl, deserialized_sl)
4204
4205                deserialized_sl[4] = "changed"
4206                self.assertEqual(sl[4], "changed")
4207                sl[3] = "newvalue"
4208                self.assertEqual(deserialized_sl[3], "newvalue")
4209
4210                larger_sl = shared_memory.ShareableList(range(400))
4211                self.addCleanup(larger_sl.shm.unlink)
4212                serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto)
4213                self.assertEqual(len(serialized_sl), len(serialized_larger_sl))
4214                larger_sl.shm.close()
4215
4216                deserialized_sl.shm.close()
4217                sl.shm.close()
4218
4219    def test_shared_memory_ShareableList_pickling_dead_object(self):
4220        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4221            with self.subTest(proto=proto):
4222                sl = shared_memory.ShareableList(range(10))
4223                serialized_sl = pickle.dumps(sl, protocol=proto)
4224
4225                # Now, we are going to kill the original object.
4226                # So, unpickled one won't be able to attach to it.
4227                sl.shm.close()
4228                sl.shm.unlink()
4229
4230                with self.assertRaises(FileNotFoundError):
4231                    pickle.loads(serialized_sl)
4232
4233    def test_shared_memory_cleaned_after_process_termination(self):
4234        cmd = '''if 1:
4235            import os, time, sys
4236            from multiprocessing import shared_memory
4237
4238            # Create a shared_memory segment, and send the segment name
4239            sm = shared_memory.SharedMemory(create=True, size=10)
4240            sys.stdout.write(sm.name + '\\n')
4241            sys.stdout.flush()
4242            time.sleep(100)
4243        '''
4244        with subprocess.Popen([sys.executable, '-E', '-c', cmd],
4245                              stdout=subprocess.PIPE,
4246                              stderr=subprocess.PIPE) as p:
4247            name = p.stdout.readline().strip().decode()
4248
4249            # killing abruptly processes holding reference to a shared memory
4250            # segment should not leak the given memory segment.
4251            p.terminate()
4252            p.wait()
4253
4254            deadline = time.monotonic() + support.LONG_TIMEOUT
4255            t = 0.1
4256            while time.monotonic() < deadline:
4257                time.sleep(t)
4258                t = min(t*2, 5)
4259                try:
4260                    smm = shared_memory.SharedMemory(name, create=False)
4261                except FileNotFoundError:
4262                    break
4263            else:
4264                raise AssertionError("A SharedMemory segment was leaked after"
4265                                     " a process was abruptly terminated.")
4266
4267            if os.name == 'posix':
4268                # Without this line it was raising warnings like:
4269                #   UserWarning: resource_tracker:
4270                #   There appear to be 1 leaked shared_memory
4271                #   objects to clean up at shutdown
4272                # See: https://bugs.python.org/issue45209
4273                resource_tracker.unregister(f"/{name}", "shared_memory")
4274
4275                # A warning was emitted by the subprocess' own
4276                # resource_tracker (on Windows, shared memory segments
4277                # are released automatically by the OS).
4278                err = p.stderr.read().decode()
4279                self.assertIn(
4280                    "resource_tracker: There appear to be 1 leaked "
4281                    "shared_memory objects to clean up at shutdown", err)
4282
4283#
4284# Test to verify that `Finalize` works.
4285#
4286
4287class _TestFinalize(BaseTestCase):
4288
4289    ALLOWED_TYPES = ('processes',)
4290
4291    def setUp(self):
4292        self.registry_backup = util._finalizer_registry.copy()
4293        util._finalizer_registry.clear()
4294
4295    def tearDown(self):
4296        gc.collect()  # For PyPy or other GCs.
4297        self.assertFalse(util._finalizer_registry)
4298        util._finalizer_registry.update(self.registry_backup)
4299
4300    @classmethod
4301    def _test_finalize(cls, conn):
4302        class Foo(object):
4303            pass
4304
4305        a = Foo()
4306        util.Finalize(a, conn.send, args=('a',))
4307        del a           # triggers callback for a
4308        gc.collect()  # For PyPy or other GCs.
4309
4310        b = Foo()
4311        close_b = util.Finalize(b, conn.send, args=('b',))
4312        close_b()       # triggers callback for b
4313        close_b()       # does nothing because callback has already been called
4314        del b           # does nothing because callback has already been called
4315        gc.collect()  # For PyPy or other GCs.
4316
4317        c = Foo()
4318        util.Finalize(c, conn.send, args=('c',))
4319
4320        d10 = Foo()
4321        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4322
4323        d01 = Foo()
4324        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4325        d02 = Foo()
4326        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4327        d03 = Foo()
4328        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4329
4330        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4331
4332        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4333
4334        # call multiprocessing's cleanup function then exit process without
4335        # garbage collecting locals
4336        util._exit_function()
4337        conn.close()
4338        os._exit(0)
4339
4340    def test_finalize(self):
4341        conn, child_conn = self.Pipe()
4342
4343        p = self.Process(target=self._test_finalize, args=(child_conn,))
4344        p.daemon = True
4345        p.start()
4346        p.join()
4347
4348        result = [obj for obj in iter(conn.recv, 'STOP')]
4349        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4350
4351    def test_thread_safety(self):
4352        # bpo-24484: _run_finalizers() should be thread-safe
4353        def cb():
4354            pass
4355
4356        class Foo(object):
4357            def __init__(self):
4358                self.ref = self  # create reference cycle
4359                # insert finalizer at random key
4360                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4361
4362        finish = False
4363        exc = None
4364
4365        def run_finalizers():
4366            nonlocal exc
4367            while not finish:
4368                time.sleep(random.random() * 1e-1)
4369                try:
4370                    # A GC run will eventually happen during this,
4371                    # collecting stale Foo's and mutating the registry
4372                    util._run_finalizers()
4373                except Exception as e:
4374                    exc = e
4375
4376        def make_finalizers():
4377            nonlocal exc
4378            d = {}
4379            while not finish:
4380                try:
4381                    # Old Foo's get gradually replaced and later
4382                    # collected by the GC (because of the cyclic ref)
4383                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
4384                except Exception as e:
4385                    exc = e
4386                    d.clear()
4387
4388        old_interval = sys.getswitchinterval()
4389        old_threshold = gc.get_threshold()
4390        try:
4391            sys.setswitchinterval(1e-6)
4392            gc.set_threshold(5, 5, 5)
4393            threads = [threading.Thread(target=run_finalizers),
4394                       threading.Thread(target=make_finalizers)]
4395            with threading_helper.start_threads(threads):
4396                time.sleep(4.0)  # Wait a bit to trigger race condition
4397                finish = True
4398            if exc is not None:
4399                raise exc
4400        finally:
4401            sys.setswitchinterval(old_interval)
4402            gc.set_threshold(*old_threshold)
4403            gc.collect()  # Collect remaining Foo's
4404
4405
4406#
4407# Test that from ... import * works for each module
4408#
4409
4410class _TestImportStar(unittest.TestCase):
4411
4412    def get_module_names(self):
4413        import glob
4414        folder = os.path.dirname(multiprocessing.__file__)
4415        pattern = os.path.join(glob.escape(folder), '*.py')
4416        files = glob.glob(pattern)
4417        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4418        modules = ['multiprocessing.' + m for m in modules]
4419        modules.remove('multiprocessing.__init__')
4420        modules.append('multiprocessing')
4421        return modules
4422
4423    def test_import(self):
4424        modules = self.get_module_names()
4425        if sys.platform == 'win32':
4426            modules.remove('multiprocessing.popen_fork')
4427            modules.remove('multiprocessing.popen_forkserver')
4428            modules.remove('multiprocessing.popen_spawn_posix')
4429        else:
4430            modules.remove('multiprocessing.popen_spawn_win32')
4431            if not HAS_REDUCTION:
4432                modules.remove('multiprocessing.popen_forkserver')
4433
4434        if c_int is None:
4435            # This module requires _ctypes
4436            modules.remove('multiprocessing.sharedctypes')
4437
4438        for name in modules:
4439            __import__(name)
4440            mod = sys.modules[name]
4441            self.assertTrue(hasattr(mod, '__all__'), name)
4442
4443            for attr in mod.__all__:
4444                self.assertTrue(
4445                    hasattr(mod, attr),
4446                    '%r does not have attribute %r' % (mod, attr)
4447                    )
4448
4449#
4450# Quick test that logging works -- does not test logging output
4451#
4452
4453class _TestLogging(BaseTestCase):
4454
4455    ALLOWED_TYPES = ('processes',)
4456
4457    def test_enable_logging(self):
4458        logger = multiprocessing.get_logger()
4459        logger.setLevel(util.SUBWARNING)
4460        self.assertTrue(logger is not None)
4461        logger.debug('this will not be printed')
4462        logger.info('nor will this')
4463        logger.setLevel(LOG_LEVEL)
4464
4465    @classmethod
4466    def _test_level(cls, conn):
4467        logger = multiprocessing.get_logger()
4468        conn.send(logger.getEffectiveLevel())
4469
4470    def test_level(self):
4471        LEVEL1 = 32
4472        LEVEL2 = 37
4473
4474        logger = multiprocessing.get_logger()
4475        root_logger = logging.getLogger()
4476        root_level = root_logger.level
4477
4478        reader, writer = multiprocessing.Pipe(duplex=False)
4479
4480        logger.setLevel(LEVEL1)
4481        p = self.Process(target=self._test_level, args=(writer,))
4482        p.start()
4483        self.assertEqual(LEVEL1, reader.recv())
4484        p.join()
4485        p.close()
4486
4487        logger.setLevel(logging.NOTSET)
4488        root_logger.setLevel(LEVEL2)
4489        p = self.Process(target=self._test_level, args=(writer,))
4490        p.start()
4491        self.assertEqual(LEVEL2, reader.recv())
4492        p.join()
4493        p.close()
4494
4495        root_logger.setLevel(root_level)
4496        logger.setLevel(level=LOG_LEVEL)
4497
4498
4499# class _TestLoggingProcessName(BaseTestCase):
4500#
4501#     def handle(self, record):
4502#         assert record.processName == multiprocessing.current_process().name
4503#         self.__handled = True
4504#
4505#     def test_logging(self):
4506#         handler = logging.Handler()
4507#         handler.handle = self.handle
4508#         self.__handled = False
4509#         # Bypass getLogger() and side-effects
4510#         logger = logging.getLoggerClass()(
4511#                 'multiprocessing.test.TestLoggingProcessName')
4512#         logger.addHandler(handler)
4513#         logger.propagate = False
4514#
4515#         logger.warn('foo')
4516#         assert self.__handled
4517
4518#
4519# Check that Process.join() retries if os.waitpid() fails with EINTR
4520#
4521
4522class _TestPollEintr(BaseTestCase):
4523
4524    ALLOWED_TYPES = ('processes',)
4525
4526    @classmethod
4527    def _killer(cls, pid):
4528        time.sleep(0.1)
4529        os.kill(pid, signal.SIGUSR1)
4530
4531    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4532    def test_poll_eintr(self):
4533        got_signal = [False]
4534        def record(*args):
4535            got_signal[0] = True
4536        pid = os.getpid()
4537        oldhandler = signal.signal(signal.SIGUSR1, record)
4538        try:
4539            killer = self.Process(target=self._killer, args=(pid,))
4540            killer.start()
4541            try:
4542                p = self.Process(target=time.sleep, args=(2,))
4543                p.start()
4544                p.join()
4545            finally:
4546                killer.join()
4547            self.assertTrue(got_signal[0])
4548            self.assertEqual(p.exitcode, 0)
4549        finally:
4550            signal.signal(signal.SIGUSR1, oldhandler)
4551
4552#
4553# Test to verify handle verification, see issue 3321
4554#
4555
4556class TestInvalidHandle(unittest.TestCase):
4557
4558    @unittest.skipIf(WIN32, "skipped on Windows")
4559    def test_invalid_handles(self):
4560        conn = multiprocessing.connection.Connection(44977608)
4561        # check that poll() doesn't crash
4562        try:
4563            conn.poll()
4564        except (ValueError, OSError):
4565            pass
4566        finally:
4567            # Hack private attribute _handle to avoid printing an error
4568            # in conn.__del__
4569            conn._handle = None
4570        self.assertRaises((ValueError, OSError),
4571                          multiprocessing.connection.Connection, -1)
4572
4573
4574
4575@hashlib_helper.requires_hashdigest('md5')
4576class OtherTest(unittest.TestCase):
4577    # TODO: add more tests for deliver/answer challenge.
4578    def test_deliver_challenge_auth_failure(self):
4579        class _FakeConnection(object):
4580            def recv_bytes(self, size):
4581                return b'something bogus'
4582            def send_bytes(self, data):
4583                pass
4584        self.assertRaises(multiprocessing.AuthenticationError,
4585                          multiprocessing.connection.deliver_challenge,
4586                          _FakeConnection(), b'abc')
4587
4588    def test_answer_challenge_auth_failure(self):
4589        class _FakeConnection(object):
4590            def __init__(self):
4591                self.count = 0
4592            def recv_bytes(self, size):
4593                self.count += 1
4594                if self.count == 1:
4595                    return multiprocessing.connection.CHALLENGE
4596                elif self.count == 2:
4597                    return b'something bogus'
4598                return b''
4599            def send_bytes(self, data):
4600                pass
4601        self.assertRaises(multiprocessing.AuthenticationError,
4602                          multiprocessing.connection.answer_challenge,
4603                          _FakeConnection(), b'abc')
4604
4605#
4606# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4607#
4608
4609def initializer(ns):
4610    ns.test += 1
4611
4612@hashlib_helper.requires_hashdigest('md5')
4613class TestInitializers(unittest.TestCase):
4614    def setUp(self):
4615        self.mgr = multiprocessing.Manager()
4616        self.ns = self.mgr.Namespace()
4617        self.ns.test = 0
4618
4619    def tearDown(self):
4620        self.mgr.shutdown()
4621        self.mgr.join()
4622
4623    def test_manager_initializer(self):
4624        m = multiprocessing.managers.SyncManager()
4625        self.assertRaises(TypeError, m.start, 1)
4626        m.start(initializer, (self.ns,))
4627        self.assertEqual(self.ns.test, 1)
4628        m.shutdown()
4629        m.join()
4630
4631    def test_pool_initializer(self):
4632        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4633        p = multiprocessing.Pool(1, initializer, (self.ns,))
4634        p.close()
4635        p.join()
4636        self.assertEqual(self.ns.test, 1)
4637
4638#
4639# Issue 5155, 5313, 5331: Test process in processes
4640# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4641#
4642
4643def _this_sub_process(q):
4644    try:
4645        item = q.get(block=False)
4646    except pyqueue.Empty:
4647        pass
4648
4649def _test_process():
4650    queue = multiprocessing.Queue()
4651    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4652    subProc.daemon = True
4653    subProc.start()
4654    subProc.join()
4655
4656def _afunc(x):
4657    return x*x
4658
4659def pool_in_process():
4660    pool = multiprocessing.Pool(processes=4)
4661    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
4662    pool.close()
4663    pool.join()
4664
4665class _file_like(object):
4666    def __init__(self, delegate):
4667        self._delegate = delegate
4668        self._pid = None
4669
4670    @property
4671    def cache(self):
4672        pid = os.getpid()
4673        # There are no race conditions since fork keeps only the running thread
4674        if pid != self._pid:
4675            self._pid = pid
4676            self._cache = []
4677        return self._cache
4678
4679    def write(self, data):
4680        self.cache.append(data)
4681
4682    def flush(self):
4683        self._delegate.write(''.join(self.cache))
4684        self._cache = []
4685
4686class TestStdinBadfiledescriptor(unittest.TestCase):
4687
4688    def test_queue_in_process(self):
4689        proc = multiprocessing.Process(target=_test_process)
4690        proc.start()
4691        proc.join()
4692
4693    def test_pool_in_process(self):
4694        p = multiprocessing.Process(target=pool_in_process)
4695        p.start()
4696        p.join()
4697
4698    def test_flushing(self):
4699        sio = io.StringIO()
4700        flike = _file_like(sio)
4701        flike.write('foo')
4702        proc = multiprocessing.Process(target=lambda: flike.flush())
4703        flike.flush()
4704        assert sio.getvalue() == 'foo'
4705
4706
4707class TestWait(unittest.TestCase):
4708
4709    @classmethod
4710    def _child_test_wait(cls, w, slow):
4711        for i in range(10):
4712            if slow:
4713                time.sleep(random.random()*0.1)
4714            w.send((i, os.getpid()))
4715        w.close()
4716
4717    def test_wait(self, slow=False):
4718        from multiprocessing.connection import wait
4719        readers = []
4720        procs = []
4721        messages = []
4722
4723        for i in range(4):
4724            r, w = multiprocessing.Pipe(duplex=False)
4725            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
4726            p.daemon = True
4727            p.start()
4728            w.close()
4729            readers.append(r)
4730            procs.append(p)
4731            self.addCleanup(p.join)
4732
4733        while readers:
4734            for r in wait(readers):
4735                try:
4736                    msg = r.recv()
4737                except EOFError:
4738                    readers.remove(r)
4739                    r.close()
4740                else:
4741                    messages.append(msg)
4742
4743        messages.sort()
4744        expected = sorted((i, p.pid) for i in range(10) for p in procs)
4745        self.assertEqual(messages, expected)
4746
4747    @classmethod
4748    def _child_test_wait_socket(cls, address, slow):
4749        s = socket.socket()
4750        s.connect(address)
4751        for i in range(10):
4752            if slow:
4753                time.sleep(random.random()*0.1)
4754            s.sendall(('%s\n' % i).encode('ascii'))
4755        s.close()
4756
4757    def test_wait_socket(self, slow=False):
4758        from multiprocessing.connection import wait
4759        l = socket.create_server((socket_helper.HOST, 0))
4760        addr = l.getsockname()
4761        readers = []
4762        procs = []
4763        dic = {}
4764
4765        for i in range(4):
4766            p = multiprocessing.Process(target=self._child_test_wait_socket,
4767                                        args=(addr, slow))
4768            p.daemon = True
4769            p.start()
4770            procs.append(p)
4771            self.addCleanup(p.join)
4772
4773        for i in range(4):
4774            r, _ = l.accept()
4775            readers.append(r)
4776            dic[r] = []
4777        l.close()
4778
4779        while readers:
4780            for r in wait(readers):
4781                msg = r.recv(32)
4782                if not msg:
4783                    readers.remove(r)
4784                    r.close()
4785                else:
4786                    dic[r].append(msg)
4787
4788        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4789        for v in dic.values():
4790            self.assertEqual(b''.join(v), expected)
4791
4792    def test_wait_slow(self):
4793        self.test_wait(True)
4794
4795    def test_wait_socket_slow(self):
4796        self.test_wait_socket(True)
4797
4798    def test_wait_timeout(self):
4799        from multiprocessing.connection import wait
4800
4801        expected = 5
4802        a, b = multiprocessing.Pipe()
4803
4804        start = time.monotonic()
4805        res = wait([a, b], expected)
4806        delta = time.monotonic() - start
4807
4808        self.assertEqual(res, [])
4809        self.assertLess(delta, expected * 2)
4810        self.assertGreater(delta, expected * 0.5)
4811
4812        b.send(None)
4813
4814        start = time.monotonic()
4815        res = wait([a, b], 20)
4816        delta = time.monotonic() - start
4817
4818        self.assertEqual(res, [a])
4819        self.assertLess(delta, 0.4)
4820
4821    @classmethod
4822    def signal_and_sleep(cls, sem, period):
4823        sem.release()
4824        time.sleep(period)
4825
4826    def test_wait_integer(self):
4827        from multiprocessing.connection import wait
4828
4829        expected = 3
4830        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
4831        sem = multiprocessing.Semaphore(0)
4832        a, b = multiprocessing.Pipe()
4833        p = multiprocessing.Process(target=self.signal_and_sleep,
4834                                    args=(sem, expected))
4835
4836        p.start()
4837        self.assertIsInstance(p.sentinel, int)
4838        self.assertTrue(sem.acquire(timeout=20))
4839
4840        start = time.monotonic()
4841        res = wait([a, p.sentinel, b], expected + 20)
4842        delta = time.monotonic() - start
4843
4844        self.assertEqual(res, [p.sentinel])
4845        self.assertLess(delta, expected + 2)
4846        self.assertGreater(delta, expected - 2)
4847
4848        a.send(None)
4849
4850        start = time.monotonic()
4851        res = wait([a, p.sentinel, b], 20)
4852        delta = time.monotonic() - start
4853
4854        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
4855        self.assertLess(delta, 0.4)
4856
4857        b.send(None)
4858
4859        start = time.monotonic()
4860        res = wait([a, p.sentinel, b], 20)
4861        delta = time.monotonic() - start
4862
4863        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
4864        self.assertLess(delta, 0.4)
4865
4866        p.terminate()
4867        p.join()
4868
4869    def test_neg_timeout(self):
4870        from multiprocessing.connection import wait
4871        a, b = multiprocessing.Pipe()
4872        t = time.monotonic()
4873        res = wait([a], timeout=-1)
4874        t = time.monotonic() - t
4875        self.assertEqual(res, [])
4876        self.assertLess(t, 1)
4877        a.close()
4878        b.close()
4879
4880#
4881# Issue 14151: Test invalid family on invalid environment
4882#
4883
4884class TestInvalidFamily(unittest.TestCase):
4885
4886    @unittest.skipIf(WIN32, "skipped on Windows")
4887    def test_invalid_family(self):
4888        with self.assertRaises(ValueError):
4889            multiprocessing.connection.Listener(r'\\.\test')
4890
4891    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
4892    def test_invalid_family_win32(self):
4893        with self.assertRaises(ValueError):
4894            multiprocessing.connection.Listener('/var/test.pipe')
4895
4896#
4897# Issue 12098: check sys.flags of child matches that for parent
4898#
4899
4900class TestFlags(unittest.TestCase):
4901    @classmethod
4902    def run_in_grandchild(cls, conn):
4903        conn.send(tuple(sys.flags))
4904
4905    @classmethod
4906    def run_in_child(cls):
4907        import json
4908        r, w = multiprocessing.Pipe(duplex=False)
4909        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4910        p.start()
4911        grandchild_flags = r.recv()
4912        p.join()
4913        r.close()
4914        w.close()
4915        flags = (tuple(sys.flags), grandchild_flags)
4916        print(json.dumps(flags))
4917
4918    def test_flags(self):
4919        import json
4920        # start child process using unusual flags
4921        prog = ('from test._test_multiprocessing import TestFlags; ' +
4922                'TestFlags.run_in_child()')
4923        data = subprocess.check_output(
4924            [sys.executable, '-E', '-S', '-O', '-c', prog])
4925        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4926        self.assertEqual(child_flags, grandchild_flags)
4927
4928#
4929# Test interaction with socket timeouts - see Issue #6056
4930#
4931
4932class TestTimeouts(unittest.TestCase):
4933    @classmethod
4934    def _test_timeout(cls, child, address):
4935        time.sleep(1)
4936        child.send(123)
4937        child.close()
4938        conn = multiprocessing.connection.Client(address)
4939        conn.send(456)
4940        conn.close()
4941
4942    def test_timeout(self):
4943        old_timeout = socket.getdefaulttimeout()
4944        try:
4945            socket.setdefaulttimeout(0.1)
4946            parent, child = multiprocessing.Pipe(duplex=True)
4947            l = multiprocessing.connection.Listener(family='AF_INET')
4948            p = multiprocessing.Process(target=self._test_timeout,
4949                                        args=(child, l.address))
4950            p.start()
4951            child.close()
4952            self.assertEqual(parent.recv(), 123)
4953            parent.close()
4954            conn = l.accept()
4955            self.assertEqual(conn.recv(), 456)
4956            conn.close()
4957            l.close()
4958            join_process(p)
4959        finally:
4960            socket.setdefaulttimeout(old_timeout)
4961
4962#
4963# Test what happens with no "if __name__ == '__main__'"
4964#
4965
4966class TestNoForkBomb(unittest.TestCase):
4967    def test_noforkbomb(self):
4968        sm = multiprocessing.get_start_method()
4969        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
4970        if sm != 'fork':
4971            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
4972            self.assertEqual(out, b'')
4973            self.assertIn(b'RuntimeError', err)
4974        else:
4975            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
4976            self.assertEqual(out.rstrip(), b'123')
4977            self.assertEqual(err, b'')
4978
4979#
4980# Issue #17555: ForkAwareThreadLock
4981#
4982
4983class TestForkAwareThreadLock(unittest.TestCase):
4984    # We recursively start processes.  Issue #17555 meant that the
4985    # after fork registry would get duplicate entries for the same
4986    # lock.  The size of the registry at generation n was ~2**n.
4987
4988    @classmethod
4989    def child(cls, n, conn):
4990        if n > 1:
4991            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4992            p.start()
4993            conn.close()
4994            join_process(p)
4995        else:
4996            conn.send(len(util._afterfork_registry))
4997        conn.close()
4998
4999    def test_lock(self):
5000        r, w = multiprocessing.Pipe(False)
5001        l = util.ForkAwareThreadLock()
5002        old_size = len(util._afterfork_registry)
5003        p = multiprocessing.Process(target=self.child, args=(5, w))
5004        p.start()
5005        w.close()
5006        new_size = r.recv()
5007        join_process(p)
5008        self.assertLessEqual(new_size, old_size)
5009
5010#
5011# Check that non-forked child processes do not inherit unneeded fds/handles
5012#
5013
5014class TestCloseFds(unittest.TestCase):
5015
5016    def get_high_socket_fd(self):
5017        if WIN32:
5018            # The child process will not have any socket handles, so
5019            # calling socket.fromfd() should produce WSAENOTSOCK even
5020            # if there is a handle of the same number.
5021            return socket.socket().detach()
5022        else:
5023            # We want to produce a socket with an fd high enough that a
5024            # freshly created child process will not have any fds as high.
5025            fd = socket.socket().detach()
5026            to_close = []
5027            while fd < 50:
5028                to_close.append(fd)
5029                fd = os.dup(fd)
5030            for x in to_close:
5031                os.close(x)
5032            return fd
5033
5034    def close(self, fd):
5035        if WIN32:
5036            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
5037        else:
5038            os.close(fd)
5039
5040    @classmethod
5041    def _test_closefds(cls, conn, fd):
5042        try:
5043            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
5044        except Exception as e:
5045            conn.send(e)
5046        else:
5047            s.close()
5048            conn.send(None)
5049
5050    def test_closefd(self):
5051        if not HAS_REDUCTION:
5052            raise unittest.SkipTest('requires fd pickling')
5053
5054        reader, writer = multiprocessing.Pipe()
5055        fd = self.get_high_socket_fd()
5056        try:
5057            p = multiprocessing.Process(target=self._test_closefds,
5058                                        args=(writer, fd))
5059            p.start()
5060            writer.close()
5061            e = reader.recv()
5062            join_process(p)
5063        finally:
5064            self.close(fd)
5065            writer.close()
5066            reader.close()
5067
5068        if multiprocessing.get_start_method() == 'fork':
5069            self.assertIs(e, None)
5070        else:
5071            WSAENOTSOCK = 10038
5072            self.assertIsInstance(e, OSError)
5073            self.assertTrue(e.errno == errno.EBADF or
5074                            e.winerror == WSAENOTSOCK, e)
5075
5076#
5077# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
5078#
5079
5080class TestIgnoreEINTR(unittest.TestCase):
5081
5082    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
5083    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
5084
5085    @classmethod
5086    def _test_ignore(cls, conn):
5087        def handler(signum, frame):
5088            pass
5089        signal.signal(signal.SIGUSR1, handler)
5090        conn.send('ready')
5091        x = conn.recv()
5092        conn.send(x)
5093        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
5094
5095    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5096    def test_ignore(self):
5097        conn, child_conn = multiprocessing.Pipe()
5098        try:
5099            p = multiprocessing.Process(target=self._test_ignore,
5100                                        args=(child_conn,))
5101            p.daemon = True
5102            p.start()
5103            child_conn.close()
5104            self.assertEqual(conn.recv(), 'ready')
5105            time.sleep(0.1)
5106            os.kill(p.pid, signal.SIGUSR1)
5107            time.sleep(0.1)
5108            conn.send(1234)
5109            self.assertEqual(conn.recv(), 1234)
5110            time.sleep(0.1)
5111            os.kill(p.pid, signal.SIGUSR1)
5112            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
5113            time.sleep(0.1)
5114            p.join()
5115        finally:
5116            conn.close()
5117
5118    @classmethod
5119    def _test_ignore_listener(cls, conn):
5120        def handler(signum, frame):
5121            pass
5122        signal.signal(signal.SIGUSR1, handler)
5123        with multiprocessing.connection.Listener() as l:
5124            conn.send(l.address)
5125            a = l.accept()
5126            a.send('welcome')
5127
5128    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5129    def test_ignore_listener(self):
5130        conn, child_conn = multiprocessing.Pipe()
5131        try:
5132            p = multiprocessing.Process(target=self._test_ignore_listener,
5133                                        args=(child_conn,))
5134            p.daemon = True
5135            p.start()
5136            child_conn.close()
5137            address = conn.recv()
5138            time.sleep(0.1)
5139            os.kill(p.pid, signal.SIGUSR1)
5140            time.sleep(0.1)
5141            client = multiprocessing.connection.Client(address)
5142            self.assertEqual(client.recv(), 'welcome')
5143            p.join()
5144        finally:
5145            conn.close()
5146
5147class TestStartMethod(unittest.TestCase):
5148    @classmethod
5149    def _check_context(cls, conn):
5150        conn.send(multiprocessing.get_start_method())
5151
5152    def check_context(self, ctx):
5153        r, w = ctx.Pipe(duplex=False)
5154        p = ctx.Process(target=self._check_context, args=(w,))
5155        p.start()
5156        w.close()
5157        child_method = r.recv()
5158        r.close()
5159        p.join()
5160        self.assertEqual(child_method, ctx.get_start_method())
5161
5162    def test_context(self):
5163        for method in ('fork', 'spawn', 'forkserver'):
5164            try:
5165                ctx = multiprocessing.get_context(method)
5166            except ValueError:
5167                continue
5168            self.assertEqual(ctx.get_start_method(), method)
5169            self.assertIs(ctx.get_context(), ctx)
5170            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
5171            self.assertRaises(ValueError, ctx.set_start_method, None)
5172            self.check_context(ctx)
5173
5174    def test_set_get(self):
5175        multiprocessing.set_forkserver_preload(PRELOAD)
5176        count = 0
5177        old_method = multiprocessing.get_start_method()
5178        try:
5179            for method in ('fork', 'spawn', 'forkserver'):
5180                try:
5181                    multiprocessing.set_start_method(method, force=True)
5182                except ValueError:
5183                    continue
5184                self.assertEqual(multiprocessing.get_start_method(), method)
5185                ctx = multiprocessing.get_context()
5186                self.assertEqual(ctx.get_start_method(), method)
5187                self.assertTrue(type(ctx).__name__.lower().startswith(method))
5188                self.assertTrue(
5189                    ctx.Process.__name__.lower().startswith(method))
5190                self.check_context(multiprocessing)
5191                count += 1
5192        finally:
5193            multiprocessing.set_start_method(old_method, force=True)
5194        self.assertGreaterEqual(count, 1)
5195
5196    def test_get_all(self):
5197        methods = multiprocessing.get_all_start_methods()
5198        if sys.platform == 'win32':
5199            self.assertEqual(methods, ['spawn'])
5200        else:
5201            self.assertTrue(methods == ['fork', 'spawn'] or
5202                            methods == ['spawn', 'fork'] or
5203                            methods == ['fork', 'spawn', 'forkserver'] or
5204                            methods == ['spawn', 'fork', 'forkserver'])
5205
5206    def test_preload_resources(self):
5207        if multiprocessing.get_start_method() != 'forkserver':
5208            self.skipTest("test only relevant for 'forkserver' method")
5209        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
5210        rc, out, err = test.support.script_helper.assert_python_ok(name)
5211        out = out.decode()
5212        err = err.decode()
5213        if out.rstrip() != 'ok' or err != '':
5214            print(out)
5215            print(err)
5216            self.fail("failed spawning forkserver or grandchild")
5217
5218
5219@unittest.skipIf(sys.platform == "win32",
5220                 "test semantics don't make sense on Windows")
5221class TestResourceTracker(unittest.TestCase):
5222
5223    def test_resource_tracker(self):
5224        #
5225        # Check that killing process does not leak named semaphores
5226        #
5227        cmd = '''if 1:
5228            import time, os, tempfile
5229            import multiprocessing as mp
5230            from multiprocessing import resource_tracker
5231            from multiprocessing.shared_memory import SharedMemory
5232
5233            mp.set_start_method("spawn")
5234            rand = tempfile._RandomNameSequence()
5235
5236
5237            def create_and_register_resource(rtype):
5238                if rtype == "semaphore":
5239                    lock = mp.Lock()
5240                    return lock, lock._semlock.name
5241                elif rtype == "shared_memory":
5242                    sm = SharedMemory(create=True, size=10)
5243                    return sm, sm._name
5244                else:
5245                    raise ValueError(
5246                        "Resource type {{}} not understood".format(rtype))
5247
5248
5249            resource1, rname1 = create_and_register_resource("{rtype}")
5250            resource2, rname2 = create_and_register_resource("{rtype}")
5251
5252            os.write({w}, rname1.encode("ascii") + b"\\n")
5253            os.write({w}, rname2.encode("ascii") + b"\\n")
5254
5255            time.sleep(10)
5256        '''
5257        for rtype in resource_tracker._CLEANUP_FUNCS:
5258            with self.subTest(rtype=rtype):
5259                if rtype == "noop":
5260                    # Artefact resource type used by the resource_tracker
5261                    continue
5262                r, w = os.pipe()
5263                p = subprocess.Popen([sys.executable,
5264                                     '-E', '-c', cmd.format(w=w, rtype=rtype)],
5265                                     pass_fds=[w],
5266                                     stderr=subprocess.PIPE)
5267                os.close(w)
5268                with open(r, 'rb', closefd=True) as f:
5269                    name1 = f.readline().rstrip().decode('ascii')
5270                    name2 = f.readline().rstrip().decode('ascii')
5271                _resource_unlink(name1, rtype)
5272                p.terminate()
5273                p.wait()
5274
5275                deadline = time.monotonic() + support.LONG_TIMEOUT
5276                while time.monotonic() < deadline:
5277                    time.sleep(.5)
5278                    try:
5279                        _resource_unlink(name2, rtype)
5280                    except OSError as e:
5281                        # docs say it should be ENOENT, but OSX seems to give
5282                        # EINVAL
5283                        self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5284                        break
5285                else:
5286                    raise AssertionError(
5287                        f"A {rtype} resource was leaked after a process was "
5288                        f"abruptly terminated.")
5289                err = p.stderr.read().decode('utf-8')
5290                p.stderr.close()
5291                expected = ('resource_tracker: There appear to be 2 leaked {} '
5292                            'objects'.format(
5293                            rtype))
5294                self.assertRegex(err, expected)
5295                self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
5296
5297    def check_resource_tracker_death(self, signum, should_die):
5298        # bpo-31310: if the semaphore tracker process has died, it should
5299        # be restarted implicitly.
5300        from multiprocessing.resource_tracker import _resource_tracker
5301        pid = _resource_tracker._pid
5302        if pid is not None:
5303            os.kill(pid, signal.SIGKILL)
5304            support.wait_process(pid, exitcode=-signal.SIGKILL)
5305        with warnings.catch_warnings():
5306            warnings.simplefilter("ignore")
5307            _resource_tracker.ensure_running()
5308        pid = _resource_tracker._pid
5309
5310        os.kill(pid, signum)
5311        time.sleep(1.0)  # give it time to die
5312
5313        ctx = multiprocessing.get_context("spawn")
5314        with warnings.catch_warnings(record=True) as all_warn:
5315            warnings.simplefilter("always")
5316            sem = ctx.Semaphore()
5317            sem.acquire()
5318            sem.release()
5319            wr = weakref.ref(sem)
5320            # ensure `sem` gets collected, which triggers communication with
5321            # the semaphore tracker
5322            del sem
5323            gc.collect()
5324            self.assertIsNone(wr())
5325            if should_die:
5326                self.assertEqual(len(all_warn), 1)
5327                the_warn = all_warn[0]
5328                self.assertTrue(issubclass(the_warn.category, UserWarning))
5329                self.assertTrue("resource_tracker: process died"
5330                                in str(the_warn.message))
5331            else:
5332                self.assertEqual(len(all_warn), 0)
5333
5334    def test_resource_tracker_sigint(self):
5335        # Catchable signal (ignored by semaphore tracker)
5336        self.check_resource_tracker_death(signal.SIGINT, False)
5337
5338    def test_resource_tracker_sigterm(self):
5339        # Catchable signal (ignored by semaphore tracker)
5340        self.check_resource_tracker_death(signal.SIGTERM, False)
5341
5342    def test_resource_tracker_sigkill(self):
5343        # Uncatchable signal.
5344        self.check_resource_tracker_death(signal.SIGKILL, True)
5345
5346    @staticmethod
5347    def _is_resource_tracker_reused(conn, pid):
5348        from multiprocessing.resource_tracker import _resource_tracker
5349        _resource_tracker.ensure_running()
5350        # The pid should be None in the child process, expect for the fork
5351        # context. It should not be a new value.
5352        reused = _resource_tracker._pid in (None, pid)
5353        reused &= _resource_tracker._check_alive()
5354        conn.send(reused)
5355
5356    def test_resource_tracker_reused(self):
5357        from multiprocessing.resource_tracker import _resource_tracker
5358        _resource_tracker.ensure_running()
5359        pid = _resource_tracker._pid
5360
5361        r, w = multiprocessing.Pipe(duplex=False)
5362        p = multiprocessing.Process(target=self._is_resource_tracker_reused,
5363                                    args=(w, pid))
5364        p.start()
5365        is_resource_tracker_reused = r.recv()
5366
5367        # Clean up
5368        p.join()
5369        w.close()
5370        r.close()
5371
5372        self.assertTrue(is_resource_tracker_reused)
5373
5374
5375class TestSimpleQueue(unittest.TestCase):
5376
5377    @classmethod
5378    def _test_empty(cls, queue, child_can_start, parent_can_continue):
5379        child_can_start.wait()
5380        # issue 30301, could fail under spawn and forkserver
5381        try:
5382            queue.put(queue.empty())
5383            queue.put(queue.empty())
5384        finally:
5385            parent_can_continue.set()
5386
5387    def test_empty(self):
5388        queue = multiprocessing.SimpleQueue()
5389        child_can_start = multiprocessing.Event()
5390        parent_can_continue = multiprocessing.Event()
5391
5392        proc = multiprocessing.Process(
5393            target=self._test_empty,
5394            args=(queue, child_can_start, parent_can_continue)
5395        )
5396        proc.daemon = True
5397        proc.start()
5398
5399        self.assertTrue(queue.empty())
5400
5401        child_can_start.set()
5402        parent_can_continue.wait()
5403
5404        self.assertFalse(queue.empty())
5405        self.assertEqual(queue.get(), True)
5406        self.assertEqual(queue.get(), False)
5407        self.assertTrue(queue.empty())
5408
5409        proc.join()
5410
5411    def test_close(self):
5412        queue = multiprocessing.SimpleQueue()
5413        queue.close()
5414        # closing a queue twice should not fail
5415        queue.close()
5416
5417    # Test specific to CPython since it tests private attributes
5418    @test.support.cpython_only
5419    def test_closed(self):
5420        queue = multiprocessing.SimpleQueue()
5421        queue.close()
5422        self.assertTrue(queue._reader.closed)
5423        self.assertTrue(queue._writer.closed)
5424
5425
5426class TestPoolNotLeakOnFailure(unittest.TestCase):
5427
5428    def test_release_unused_processes(self):
5429        # Issue #19675: During pool creation, if we can't create a process,
5430        # don't leak already created ones.
5431        will_fail_in = 3
5432        forked_processes = []
5433
5434        class FailingForkProcess:
5435            def __init__(self, **kwargs):
5436                self.name = 'Fake Process'
5437                self.exitcode = None
5438                self.state = None
5439                forked_processes.append(self)
5440
5441            def start(self):
5442                nonlocal will_fail_in
5443                if will_fail_in <= 0:
5444                    raise OSError("Manually induced OSError")
5445                will_fail_in -= 1
5446                self.state = 'started'
5447
5448            def terminate(self):
5449                self.state = 'stopping'
5450
5451            def join(self):
5452                if self.state == 'stopping':
5453                    self.state = 'stopped'
5454
5455            def is_alive(self):
5456                return self.state == 'started' or self.state == 'stopping'
5457
5458        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5459            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5460                Process=FailingForkProcess))
5461            p.close()
5462            p.join()
5463        self.assertFalse(
5464            any(process.is_alive() for process in forked_processes))
5465
5466
5467@hashlib_helper.requires_hashdigest('md5')
5468class TestSyncManagerTypes(unittest.TestCase):
5469    """Test all the types which can be shared between a parent and a
5470    child process by using a manager which acts as an intermediary
5471    between them.
5472
5473    In the following unit-tests the base type is created in the parent
5474    process, the @classmethod represents the worker process and the
5475    shared object is readable and editable between the two.
5476
5477    # The child.
5478    @classmethod
5479    def _test_list(cls, obj):
5480        assert obj[0] == 5
5481        assert obj.append(6)
5482
5483    # The parent.
5484    def test_list(self):
5485        o = self.manager.list()
5486        o.append(5)
5487        self.run_worker(self._test_list, o)
5488        assert o[1] == 6
5489    """
5490    manager_class = multiprocessing.managers.SyncManager
5491
5492    def setUp(self):
5493        self.manager = self.manager_class()
5494        self.manager.start()
5495        self.proc = None
5496
5497    def tearDown(self):
5498        if self.proc is not None and self.proc.is_alive():
5499            self.proc.terminate()
5500            self.proc.join()
5501        self.manager.shutdown()
5502        self.manager = None
5503        self.proc = None
5504
5505    @classmethod
5506    def setUpClass(cls):
5507        support.reap_children()
5508
5509    tearDownClass = setUpClass
5510
5511    def wait_proc_exit(self):
5512        # Only the manager process should be returned by active_children()
5513        # but this can take a bit on slow machines, so wait a few seconds
5514        # if there are other children too (see #17395).
5515        join_process(self.proc)
5516        start_time = time.monotonic()
5517        t = 0.01
5518        while len(multiprocessing.active_children()) > 1:
5519            time.sleep(t)
5520            t *= 2
5521            dt = time.monotonic() - start_time
5522            if dt >= 5.0:
5523                test.support.environment_altered = True
5524                support.print_warning(f"multiprocessing.Manager still has "
5525                                      f"{multiprocessing.active_children()} "
5526                                      f"active children after {dt} seconds")
5527                break
5528
5529    def run_worker(self, worker, obj):
5530        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5531        self.proc.daemon = True
5532        self.proc.start()
5533        self.wait_proc_exit()
5534        self.assertEqual(self.proc.exitcode, 0)
5535
5536    @classmethod
5537    def _test_event(cls, obj):
5538        assert obj.is_set()
5539        obj.wait()
5540        obj.clear()
5541        obj.wait(0.001)
5542
5543    def test_event(self):
5544        o = self.manager.Event()
5545        o.set()
5546        self.run_worker(self._test_event, o)
5547        assert not o.is_set()
5548        o.wait(0.001)
5549
5550    @classmethod
5551    def _test_lock(cls, obj):
5552        obj.acquire()
5553
5554    def test_lock(self, lname="Lock"):
5555        o = getattr(self.manager, lname)()
5556        self.run_worker(self._test_lock, o)
5557        o.release()
5558        self.assertRaises(RuntimeError, o.release)  # already released
5559
5560    @classmethod
5561    def _test_rlock(cls, obj):
5562        obj.acquire()
5563        obj.release()
5564
5565    def test_rlock(self, lname="Lock"):
5566        o = getattr(self.manager, lname)()
5567        self.run_worker(self._test_rlock, o)
5568
5569    @classmethod
5570    def _test_semaphore(cls, obj):
5571        obj.acquire()
5572
5573    def test_semaphore(self, sname="Semaphore"):
5574        o = getattr(self.manager, sname)()
5575        self.run_worker(self._test_semaphore, o)
5576        o.release()
5577
5578    def test_bounded_semaphore(self):
5579        self.test_semaphore(sname="BoundedSemaphore")
5580
5581    @classmethod
5582    def _test_condition(cls, obj):
5583        obj.acquire()
5584        obj.release()
5585
5586    def test_condition(self):
5587        o = self.manager.Condition()
5588        self.run_worker(self._test_condition, o)
5589
5590    @classmethod
5591    def _test_barrier(cls, obj):
5592        assert obj.parties == 5
5593        obj.reset()
5594
5595    def test_barrier(self):
5596        o = self.manager.Barrier(5)
5597        self.run_worker(self._test_barrier, o)
5598
5599    @classmethod
5600    def _test_pool(cls, obj):
5601        # TODO: fix https://bugs.python.org/issue35919
5602        with obj:
5603            pass
5604
5605    def test_pool(self):
5606        o = self.manager.Pool(processes=4)
5607        self.run_worker(self._test_pool, o)
5608
5609    @classmethod
5610    def _test_queue(cls, obj):
5611        assert obj.qsize() == 2
5612        assert obj.full()
5613        assert not obj.empty()
5614        assert obj.get() == 5
5615        assert not obj.empty()
5616        assert obj.get() == 6
5617        assert obj.empty()
5618
5619    def test_queue(self, qname="Queue"):
5620        o = getattr(self.manager, qname)(2)
5621        o.put(5)
5622        o.put(6)
5623        self.run_worker(self._test_queue, o)
5624        assert o.empty()
5625        assert not o.full()
5626
5627    def test_joinable_queue(self):
5628        self.test_queue("JoinableQueue")
5629
5630    @classmethod
5631    def _test_list(cls, obj):
5632        assert obj[0] == 5
5633        assert obj.count(5) == 1
5634        assert obj.index(5) == 0
5635        obj.sort()
5636        obj.reverse()
5637        for x in obj:
5638            pass
5639        assert len(obj) == 1
5640        assert obj.pop(0) == 5
5641
5642    def test_list(self):
5643        o = self.manager.list()
5644        o.append(5)
5645        self.run_worker(self._test_list, o)
5646        assert not o
5647        self.assertEqual(len(o), 0)
5648
5649    @classmethod
5650    def _test_dict(cls, obj):
5651        assert len(obj) == 1
5652        assert obj['foo'] == 5
5653        assert obj.get('foo') == 5
5654        assert list(obj.items()) == [('foo', 5)]
5655        assert list(obj.keys()) == ['foo']
5656        assert list(obj.values()) == [5]
5657        assert obj.copy() == {'foo': 5}
5658        assert obj.popitem() == ('foo', 5)
5659
5660    def test_dict(self):
5661        o = self.manager.dict()
5662        o['foo'] = 5
5663        self.run_worker(self._test_dict, o)
5664        assert not o
5665        self.assertEqual(len(o), 0)
5666
5667    @classmethod
5668    def _test_value(cls, obj):
5669        assert obj.value == 1
5670        assert obj.get() == 1
5671        obj.set(2)
5672
5673    def test_value(self):
5674        o = self.manager.Value('i', 1)
5675        self.run_worker(self._test_value, o)
5676        self.assertEqual(o.value, 2)
5677        self.assertEqual(o.get(), 2)
5678
5679    @classmethod
5680    def _test_array(cls, obj):
5681        assert obj[0] == 0
5682        assert obj[1] == 1
5683        assert len(obj) == 2
5684        assert list(obj) == [0, 1]
5685
5686    def test_array(self):
5687        o = self.manager.Array('i', [0, 1])
5688        self.run_worker(self._test_array, o)
5689
5690    @classmethod
5691    def _test_namespace(cls, obj):
5692        assert obj.x == 0
5693        assert obj.y == 1
5694
5695    def test_namespace(self):
5696        o = self.manager.Namespace()
5697        o.x = 0
5698        o.y = 1
5699        self.run_worker(self._test_namespace, o)
5700
5701
5702class MiscTestCase(unittest.TestCase):
5703    def test__all__(self):
5704        # Just make sure names in not_exported are excluded
5705        support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5706                             not_exported=['SUBDEBUG', 'SUBWARNING'])
5707
5708
5709#
5710# Mixins
5711#
5712
5713class BaseMixin(object):
5714    @classmethod
5715    def setUpClass(cls):
5716        cls.dangling = (multiprocessing.process._dangling.copy(),
5717                        threading._dangling.copy())
5718
5719    @classmethod
5720    def tearDownClass(cls):
5721        # bpo-26762: Some multiprocessing objects like Pool create reference
5722        # cycles. Trigger a garbage collection to break these cycles.
5723        test.support.gc_collect()
5724
5725        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5726        if processes:
5727            test.support.environment_altered = True
5728            support.print_warning(f'Dangling processes: {processes}')
5729        processes = None
5730
5731        threads = set(threading._dangling) - set(cls.dangling[1])
5732        if threads:
5733            test.support.environment_altered = True
5734            support.print_warning(f'Dangling threads: {threads}')
5735        threads = None
5736
5737
5738class ProcessesMixin(BaseMixin):
5739    TYPE = 'processes'
5740    Process = multiprocessing.Process
5741    connection = multiprocessing.connection
5742    current_process = staticmethod(multiprocessing.current_process)
5743    parent_process = staticmethod(multiprocessing.parent_process)
5744    active_children = staticmethod(multiprocessing.active_children)
5745    Pool = staticmethod(multiprocessing.Pool)
5746    Pipe = staticmethod(multiprocessing.Pipe)
5747    Queue = staticmethod(multiprocessing.Queue)
5748    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5749    Lock = staticmethod(multiprocessing.Lock)
5750    RLock = staticmethod(multiprocessing.RLock)
5751    Semaphore = staticmethod(multiprocessing.Semaphore)
5752    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5753    Condition = staticmethod(multiprocessing.Condition)
5754    Event = staticmethod(multiprocessing.Event)
5755    Barrier = staticmethod(multiprocessing.Barrier)
5756    Value = staticmethod(multiprocessing.Value)
5757    Array = staticmethod(multiprocessing.Array)
5758    RawValue = staticmethod(multiprocessing.RawValue)
5759    RawArray = staticmethod(multiprocessing.RawArray)
5760
5761
5762class ManagerMixin(BaseMixin):
5763    TYPE = 'manager'
5764    Process = multiprocessing.Process
5765    Queue = property(operator.attrgetter('manager.Queue'))
5766    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5767    Lock = property(operator.attrgetter('manager.Lock'))
5768    RLock = property(operator.attrgetter('manager.RLock'))
5769    Semaphore = property(operator.attrgetter('manager.Semaphore'))
5770    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5771    Condition = property(operator.attrgetter('manager.Condition'))
5772    Event = property(operator.attrgetter('manager.Event'))
5773    Barrier = property(operator.attrgetter('manager.Barrier'))
5774    Value = property(operator.attrgetter('manager.Value'))
5775    Array = property(operator.attrgetter('manager.Array'))
5776    list = property(operator.attrgetter('manager.list'))
5777    dict = property(operator.attrgetter('manager.dict'))
5778    Namespace = property(operator.attrgetter('manager.Namespace'))
5779
5780    @classmethod
5781    def Pool(cls, *args, **kwds):
5782        return cls.manager.Pool(*args, **kwds)
5783
5784    @classmethod
5785    def setUpClass(cls):
5786        super().setUpClass()
5787        cls.manager = multiprocessing.Manager()
5788
5789    @classmethod
5790    def tearDownClass(cls):
5791        # only the manager process should be returned by active_children()
5792        # but this can take a bit on slow machines, so wait a few seconds
5793        # if there are other children too (see #17395)
5794        start_time = time.monotonic()
5795        t = 0.01
5796        while len(multiprocessing.active_children()) > 1:
5797            time.sleep(t)
5798            t *= 2
5799            dt = time.monotonic() - start_time
5800            if dt >= 5.0:
5801                test.support.environment_altered = True
5802                support.print_warning(f"multiprocessing.Manager still has "
5803                                      f"{multiprocessing.active_children()} "
5804                                      f"active children after {dt} seconds")
5805                break
5806
5807        gc.collect()                       # do garbage collection
5808        if cls.manager._number_of_objects() != 0:
5809            # This is not really an error since some tests do not
5810            # ensure that all processes which hold a reference to a
5811            # managed object have been joined.
5812            test.support.environment_altered = True
5813            support.print_warning('Shared objects which still exist '
5814                                  'at manager shutdown:')
5815            support.print_warning(cls.manager._debug_info())
5816        cls.manager.shutdown()
5817        cls.manager.join()
5818        cls.manager = None
5819
5820        super().tearDownClass()
5821
5822
5823class ThreadsMixin(BaseMixin):
5824    TYPE = 'threads'
5825    Process = multiprocessing.dummy.Process
5826    connection = multiprocessing.dummy.connection
5827    current_process = staticmethod(multiprocessing.dummy.current_process)
5828    active_children = staticmethod(multiprocessing.dummy.active_children)
5829    Pool = staticmethod(multiprocessing.dummy.Pool)
5830    Pipe = staticmethod(multiprocessing.dummy.Pipe)
5831    Queue = staticmethod(multiprocessing.dummy.Queue)
5832    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5833    Lock = staticmethod(multiprocessing.dummy.Lock)
5834    RLock = staticmethod(multiprocessing.dummy.RLock)
5835    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5836    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5837    Condition = staticmethod(multiprocessing.dummy.Condition)
5838    Event = staticmethod(multiprocessing.dummy.Event)
5839    Barrier = staticmethod(multiprocessing.dummy.Barrier)
5840    Value = staticmethod(multiprocessing.dummy.Value)
5841    Array = staticmethod(multiprocessing.dummy.Array)
5842
5843#
5844# Functions used to create test cases from the base ones in this module
5845#
5846
5847def install_tests_in_module_dict(remote_globs, start_method):
5848    __module__ = remote_globs['__name__']
5849    local_globs = globals()
5850    ALL_TYPES = {'processes', 'threads', 'manager'}
5851
5852    for name, base in local_globs.items():
5853        if not isinstance(base, type):
5854            continue
5855        if issubclass(base, BaseTestCase):
5856            if base is BaseTestCase:
5857                continue
5858            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5859            for type_ in base.ALLOWED_TYPES:
5860                newname = 'With' + type_.capitalize() + name[1:]
5861                Mixin = local_globs[type_.capitalize() + 'Mixin']
5862                class Temp(base, Mixin, unittest.TestCase):
5863                    pass
5864                if type_ == 'manager':
5865                    Temp = hashlib_helper.requires_hashdigest('md5')(Temp)
5866                Temp.__name__ = Temp.__qualname__ = newname
5867                Temp.__module__ = __module__
5868                remote_globs[newname] = Temp
5869        elif issubclass(base, unittest.TestCase):
5870            class Temp(base, object):
5871                pass
5872            Temp.__name__ = Temp.__qualname__ = name
5873            Temp.__module__ = __module__
5874            remote_globs[name] = Temp
5875
5876    dangling = [None, None]
5877    old_start_method = [None]
5878
5879    def setUpModule():
5880        multiprocessing.set_forkserver_preload(PRELOAD)
5881        multiprocessing.process._cleanup()
5882        dangling[0] = multiprocessing.process._dangling.copy()
5883        dangling[1] = threading._dangling.copy()
5884        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
5885        try:
5886            multiprocessing.set_start_method(start_method, force=True)
5887        except ValueError:
5888            raise unittest.SkipTest(start_method +
5889                                    ' start method not supported')
5890
5891        if sys.platform.startswith("linux"):
5892            try:
5893                lock = multiprocessing.RLock()
5894            except OSError:
5895                raise unittest.SkipTest("OSError raises on RLock creation, "
5896                                        "see issue 3111!")
5897        check_enough_semaphores()
5898        util.get_temp_dir()     # creates temp directory
5899        multiprocessing.get_logger().setLevel(LOG_LEVEL)
5900
5901    def tearDownModule():
5902        need_sleep = False
5903
5904        # bpo-26762: Some multiprocessing objects like Pool create reference
5905        # cycles. Trigger a garbage collection to break these cycles.
5906        test.support.gc_collect()
5907
5908        multiprocessing.set_start_method(old_start_method[0], force=True)
5909        # pause a bit so we don't get warning about dangling threads/processes
5910        processes = set(multiprocessing.process._dangling) - set(dangling[0])
5911        if processes:
5912            need_sleep = True
5913            test.support.environment_altered = True
5914            support.print_warning(f'Dangling processes: {processes}')
5915        processes = None
5916
5917        threads = set(threading._dangling) - set(dangling[1])
5918        if threads:
5919            need_sleep = True
5920            test.support.environment_altered = True
5921            support.print_warning(f'Dangling threads: {threads}')
5922        threads = None
5923
5924        # Sleep 500 ms to give time to child processes to complete.
5925        if need_sleep:
5926            time.sleep(0.5)
5927
5928        multiprocessing.util._cleanup_tests()
5929
5930    remote_globs['setUpModule'] = setUpModule
5931    remote_globs['tearDownModule'] = tearDownModule
5932