• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import unittest.mock
7import queue as pyqueue
8import time
9import io
10import itertools
11import sys
12import os
13import gc
14import errno
15import signal
16import array
17import socket
18import random
19import logging
20import subprocess
21import struct
22import operator
23import pickle
24import weakref
25import warnings
26import test.support
27import test.support.script_helper
28from test import support
29from test.support import hashlib_helper
30from test.support import import_helper
31from test.support import os_helper
32from test.support import socket_helper
33from test.support import threading_helper
34from test.support import warnings_helper
35
36
37# Skip tests if _multiprocessing wasn't built.
38_multiprocessing = import_helper.import_module('_multiprocessing')
39# Skip tests if sem_open implementation is broken.
40support.skip_if_broken_multiprocessing_synchronize()
41import threading
42
43import multiprocessing.connection
44import multiprocessing.dummy
45import multiprocessing.heap
46import multiprocessing.managers
47import multiprocessing.pool
48import multiprocessing.queues
49
50from multiprocessing import util
51
52try:
53    from multiprocessing import reduction
54    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
55except ImportError:
56    HAS_REDUCTION = False
57
58try:
59    from multiprocessing.sharedctypes import Value, copy
60    HAS_SHAREDCTYPES = True
61except ImportError:
62    HAS_SHAREDCTYPES = False
63
64try:
65    from multiprocessing import shared_memory
66    HAS_SHMEM = True
67except ImportError:
68    HAS_SHMEM = False
69
70try:
71    import msvcrt
72except ImportError:
73    msvcrt = None
74
75
76def latin(s):
77    return s.encode('latin')
78
79
80def close_queue(queue):
81    if isinstance(queue, multiprocessing.queues.Queue):
82        queue.close()
83        queue.join_thread()
84
85
86def join_process(process):
87    # Since multiprocessing.Process has the same API than threading.Thread
88    # (join() and is_alive(), the support function can be reused
89    threading_helper.join_thread(process)
90
91
92if os.name == "posix":
93    from multiprocessing import resource_tracker
94
95    def _resource_unlink(name, rtype):
96        resource_tracker._CLEANUP_FUNCS[rtype](name)
97
98
99#
100# Constants
101#
102
103LOG_LEVEL = util.SUBWARNING
104#LOG_LEVEL = logging.DEBUG
105
106DELTA = 0.1
107CHECK_TIMINGS = False     # making true makes tests take a lot longer
108                          # and can sometimes cause some non-serious
109                          # failures because some calls block a bit
110                          # longer than expected
111if CHECK_TIMINGS:
112    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
113else:
114    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
115
116HAVE_GETVALUE = not getattr(_multiprocessing,
117                            'HAVE_BROKEN_SEM_GETVALUE', False)
118
119WIN32 = (sys.platform == "win32")
120
121from multiprocessing.connection import wait
122
123def wait_for_handle(handle, timeout):
124    if timeout is not None and timeout < 0.0:
125        timeout = None
126    return wait([handle], timeout)
127
128try:
129    MAXFD = os.sysconf("SC_OPEN_MAX")
130except:
131    MAXFD = 256
132
133# To speed up tests when using the forkserver, we can preload these:
134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
135
136#
137# Some tests require ctypes
138#
139
140try:
141    from ctypes import Structure, c_int, c_double, c_longlong
142except ImportError:
143    Structure = object
144    c_int = c_double = c_longlong = None
145
146
147def check_enough_semaphores():
148    """Check that the system supports enough semaphores to run the test."""
149    # minimum number of semaphores available according to POSIX
150    nsems_min = 256
151    try:
152        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
153    except (AttributeError, ValueError):
154        # sysconf not available or setting not available
155        return
156    if nsems == -1 or nsems >= nsems_min:
157        return
158    raise unittest.SkipTest("The OS doesn't support enough semaphores "
159                            "to run the test (required: %d)." % nsems_min)
160
161
162#
163# Creates a wrapper for a function which records the time it takes to finish
164#
165
166class TimingWrapper(object):
167
168    def __init__(self, func):
169        self.func = func
170        self.elapsed = None
171
172    def __call__(self, *args, **kwds):
173        t = time.monotonic()
174        try:
175            return self.func(*args, **kwds)
176        finally:
177            self.elapsed = time.monotonic() - t
178
179#
180# Base class for test cases
181#
182
183class BaseTestCase(object):
184
185    ALLOWED_TYPES = ('processes', 'manager', 'threads')
186
187    def assertTimingAlmostEqual(self, a, b):
188        if CHECK_TIMINGS:
189            self.assertAlmostEqual(a, b, 1)
190
191    def assertReturnsIfImplemented(self, value, func, *args):
192        try:
193            res = func(*args)
194        except NotImplementedError:
195            pass
196        else:
197            return self.assertEqual(value, res)
198
199    # For the sanity of Windows users, rather than crashing or freezing in
200    # multiple ways.
201    def __reduce__(self, *args):
202        raise NotImplementedError("shouldn't try to pickle a test case")
203
204    __reduce_ex__ = __reduce__
205
206#
207# Return the value of a semaphore
208#
209
210def get_value(self):
211    try:
212        return self.get_value()
213    except AttributeError:
214        try:
215            return self._Semaphore__value
216        except AttributeError:
217            try:
218                return self._value
219            except AttributeError:
220                raise NotImplementedError
221
222#
223# Testcases
224#
225
226class DummyCallable:
227    def __call__(self, q, c):
228        assert isinstance(c, DummyCallable)
229        q.put(5)
230
231
232class _TestProcess(BaseTestCase):
233
234    ALLOWED_TYPES = ('processes', 'threads')
235
236    def test_current(self):
237        if self.TYPE == 'threads':
238            self.skipTest('test not appropriate for {}'.format(self.TYPE))
239
240        current = self.current_process()
241        authkey = current.authkey
242
243        self.assertTrue(current.is_alive())
244        self.assertTrue(not current.daemon)
245        self.assertIsInstance(authkey, bytes)
246        self.assertTrue(len(authkey) > 0)
247        self.assertEqual(current.ident, os.getpid())
248        self.assertEqual(current.exitcode, None)
249
250    def test_daemon_argument(self):
251        if self.TYPE == "threads":
252            self.skipTest('test not appropriate for {}'.format(self.TYPE))
253
254        # By default uses the current process's daemon flag.
255        proc0 = self.Process(target=self._test)
256        self.assertEqual(proc0.daemon, self.current_process().daemon)
257        proc1 = self.Process(target=self._test, daemon=True)
258        self.assertTrue(proc1.daemon)
259        proc2 = self.Process(target=self._test, daemon=False)
260        self.assertFalse(proc2.daemon)
261
262    @classmethod
263    def _test(cls, q, *args, **kwds):
264        current = cls.current_process()
265        q.put(args)
266        q.put(kwds)
267        q.put(current.name)
268        if cls.TYPE != 'threads':
269            q.put(bytes(current.authkey))
270            q.put(current.pid)
271
272    def test_parent_process_attributes(self):
273        if self.TYPE == "threads":
274            self.skipTest('test not appropriate for {}'.format(self.TYPE))
275
276        self.assertIsNone(self.parent_process())
277
278        rconn, wconn = self.Pipe(duplex=False)
279        p = self.Process(target=self._test_send_parent_process, args=(wconn,))
280        p.start()
281        p.join()
282        parent_pid, parent_name = rconn.recv()
283        self.assertEqual(parent_pid, self.current_process().pid)
284        self.assertEqual(parent_pid, os.getpid())
285        self.assertEqual(parent_name, self.current_process().name)
286
287    @classmethod
288    def _test_send_parent_process(cls, wconn):
289        from multiprocessing.process import parent_process
290        wconn.send([parent_process().pid, parent_process().name])
291
292    def test_parent_process(self):
293        if self.TYPE == "threads":
294            self.skipTest('test not appropriate for {}'.format(self.TYPE))
295
296        # Launch a child process. Make it launch a grandchild process. Kill the
297        # child process and make sure that the grandchild notices the death of
298        # its parent (a.k.a the child process).
299        rconn, wconn = self.Pipe(duplex=False)
300        p = self.Process(
301            target=self._test_create_grandchild_process, args=(wconn, ))
302        p.start()
303
304        if not rconn.poll(timeout=support.LONG_TIMEOUT):
305            raise AssertionError("Could not communicate with child process")
306        parent_process_status = rconn.recv()
307        self.assertEqual(parent_process_status, "alive")
308
309        p.terminate()
310        p.join()
311
312        if not rconn.poll(timeout=support.LONG_TIMEOUT):
313            raise AssertionError("Could not communicate with child process")
314        parent_process_status = rconn.recv()
315        self.assertEqual(parent_process_status, "not alive")
316
317    @classmethod
318    def _test_create_grandchild_process(cls, wconn):
319        p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
320        p.start()
321        time.sleep(300)
322
323    @classmethod
324    def _test_report_parent_status(cls, wconn):
325        from multiprocessing.process import parent_process
326        wconn.send("alive" if parent_process().is_alive() else "not alive")
327        parent_process().join(timeout=support.SHORT_TIMEOUT)
328        wconn.send("alive" if parent_process().is_alive() else "not alive")
329
330    def test_process(self):
331        q = self.Queue(1)
332        e = self.Event()
333        args = (q, 1, 2)
334        kwargs = {'hello':23, 'bye':2.54}
335        name = 'SomeProcess'
336        p = self.Process(
337            target=self._test, args=args, kwargs=kwargs, name=name
338            )
339        p.daemon = True
340        current = self.current_process()
341
342        if self.TYPE != 'threads':
343            self.assertEqual(p.authkey, current.authkey)
344        self.assertEqual(p.is_alive(), False)
345        self.assertEqual(p.daemon, True)
346        self.assertNotIn(p, self.active_children())
347        self.assertTrue(type(self.active_children()) is list)
348        self.assertEqual(p.exitcode, None)
349
350        p.start()
351
352        self.assertEqual(p.exitcode, None)
353        self.assertEqual(p.is_alive(), True)
354        self.assertIn(p, self.active_children())
355
356        self.assertEqual(q.get(), args[1:])
357        self.assertEqual(q.get(), kwargs)
358        self.assertEqual(q.get(), p.name)
359        if self.TYPE != 'threads':
360            self.assertEqual(q.get(), current.authkey)
361            self.assertEqual(q.get(), p.pid)
362
363        p.join()
364
365        self.assertEqual(p.exitcode, 0)
366        self.assertEqual(p.is_alive(), False)
367        self.assertNotIn(p, self.active_children())
368        close_queue(q)
369
370    @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
371    def test_process_mainthread_native_id(self):
372        if self.TYPE == 'threads':
373            self.skipTest('test not appropriate for {}'.format(self.TYPE))
374
375        current_mainthread_native_id = threading.main_thread().native_id
376
377        q = self.Queue(1)
378        p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
379        p.start()
380
381        child_mainthread_native_id = q.get()
382        p.join()
383        close_queue(q)
384
385        self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
386
387    @classmethod
388    def _test_process_mainthread_native_id(cls, q):
389        mainthread_native_id = threading.main_thread().native_id
390        q.put(mainthread_native_id)
391
392    @classmethod
393    def _sleep_some(cls):
394        time.sleep(100)
395
396    @classmethod
397    def _test_sleep(cls, delay):
398        time.sleep(delay)
399
400    def _kill_process(self, meth):
401        if self.TYPE == 'threads':
402            self.skipTest('test not appropriate for {}'.format(self.TYPE))
403
404        p = self.Process(target=self._sleep_some)
405        p.daemon = True
406        p.start()
407
408        self.assertEqual(p.is_alive(), True)
409        self.assertIn(p, self.active_children())
410        self.assertEqual(p.exitcode, None)
411
412        join = TimingWrapper(p.join)
413
414        self.assertEqual(join(0), None)
415        self.assertTimingAlmostEqual(join.elapsed, 0.0)
416        self.assertEqual(p.is_alive(), True)
417
418        self.assertEqual(join(-1), None)
419        self.assertTimingAlmostEqual(join.elapsed, 0.0)
420        self.assertEqual(p.is_alive(), True)
421
422        # XXX maybe terminating too soon causes the problems on Gentoo...
423        time.sleep(1)
424
425        meth(p)
426
427        if hasattr(signal, 'alarm'):
428            # On the Gentoo buildbot waitpid() often seems to block forever.
429            # We use alarm() to interrupt it if it blocks for too long.
430            def handler(*args):
431                raise RuntimeError('join took too long: %s' % p)
432            old_handler = signal.signal(signal.SIGALRM, handler)
433            try:
434                signal.alarm(10)
435                self.assertEqual(join(), None)
436            finally:
437                signal.alarm(0)
438                signal.signal(signal.SIGALRM, old_handler)
439        else:
440            self.assertEqual(join(), None)
441
442        self.assertTimingAlmostEqual(join.elapsed, 0.0)
443
444        self.assertEqual(p.is_alive(), False)
445        self.assertNotIn(p, self.active_children())
446
447        p.join()
448
449        return p.exitcode
450
451    def test_terminate(self):
452        exitcode = self._kill_process(multiprocessing.Process.terminate)
453        if os.name != 'nt':
454            self.assertEqual(exitcode, -signal.SIGTERM)
455
456    def test_kill(self):
457        exitcode = self._kill_process(multiprocessing.Process.kill)
458        if os.name != 'nt':
459            self.assertEqual(exitcode, -signal.SIGKILL)
460
461    def test_cpu_count(self):
462        try:
463            cpus = multiprocessing.cpu_count()
464        except NotImplementedError:
465            cpus = 1
466        self.assertTrue(type(cpus) is int)
467        self.assertTrue(cpus >= 1)
468
469    def test_active_children(self):
470        self.assertEqual(type(self.active_children()), list)
471
472        p = self.Process(target=time.sleep, args=(DELTA,))
473        self.assertNotIn(p, self.active_children())
474
475        p.daemon = True
476        p.start()
477        self.assertIn(p, self.active_children())
478
479        p.join()
480        self.assertNotIn(p, self.active_children())
481
482    @classmethod
483    def _test_recursion(cls, wconn, id):
484        wconn.send(id)
485        if len(id) < 2:
486            for i in range(2):
487                p = cls.Process(
488                    target=cls._test_recursion, args=(wconn, id+[i])
489                    )
490                p.start()
491                p.join()
492
493    def test_recursion(self):
494        rconn, wconn = self.Pipe(duplex=False)
495        self._test_recursion(wconn, [])
496
497        time.sleep(DELTA)
498        result = []
499        while rconn.poll():
500            result.append(rconn.recv())
501
502        expected = [
503            [],
504              [0],
505                [0, 0],
506                [0, 1],
507              [1],
508                [1, 0],
509                [1, 1]
510            ]
511        self.assertEqual(result, expected)
512
513    @classmethod
514    def _test_sentinel(cls, event):
515        event.wait(10.0)
516
517    def test_sentinel(self):
518        if self.TYPE == "threads":
519            self.skipTest('test not appropriate for {}'.format(self.TYPE))
520        event = self.Event()
521        p = self.Process(target=self._test_sentinel, args=(event,))
522        with self.assertRaises(ValueError):
523            p.sentinel
524        p.start()
525        self.addCleanup(p.join)
526        sentinel = p.sentinel
527        self.assertIsInstance(sentinel, int)
528        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
529        event.set()
530        p.join()
531        self.assertTrue(wait_for_handle(sentinel, timeout=1))
532
533    @classmethod
534    def _test_close(cls, rc=0, q=None):
535        if q is not None:
536            q.get()
537        sys.exit(rc)
538
539    def test_close(self):
540        if self.TYPE == "threads":
541            self.skipTest('test not appropriate for {}'.format(self.TYPE))
542        q = self.Queue()
543        p = self.Process(target=self._test_close, kwargs={'q': q})
544        p.daemon = True
545        p.start()
546        self.assertEqual(p.is_alive(), True)
547        # Child is still alive, cannot close
548        with self.assertRaises(ValueError):
549            p.close()
550
551        q.put(None)
552        p.join()
553        self.assertEqual(p.is_alive(), False)
554        self.assertEqual(p.exitcode, 0)
555        p.close()
556        with self.assertRaises(ValueError):
557            p.is_alive()
558        with self.assertRaises(ValueError):
559            p.join()
560        with self.assertRaises(ValueError):
561            p.terminate()
562        p.close()
563
564        wr = weakref.ref(p)
565        del p
566        gc.collect()
567        self.assertIs(wr(), None)
568
569        close_queue(q)
570
571    def test_many_processes(self):
572        if self.TYPE == 'threads':
573            self.skipTest('test not appropriate for {}'.format(self.TYPE))
574
575        sm = multiprocessing.get_start_method()
576        N = 5 if sm == 'spawn' else 100
577
578        # Try to overwhelm the forkserver loop with events
579        procs = [self.Process(target=self._test_sleep, args=(0.01,))
580                 for i in range(N)]
581        for p in procs:
582            p.start()
583        for p in procs:
584            join_process(p)
585        for p in procs:
586            self.assertEqual(p.exitcode, 0)
587
588        procs = [self.Process(target=self._sleep_some)
589                 for i in range(N)]
590        for p in procs:
591            p.start()
592        time.sleep(0.001)  # let the children start...
593        for p in procs:
594            p.terminate()
595        for p in procs:
596            join_process(p)
597        if os.name != 'nt':
598            exitcodes = [-signal.SIGTERM]
599            if sys.platform == 'darwin':
600                # bpo-31510: On macOS, killing a freshly started process with
601                # SIGTERM sometimes kills the process with SIGKILL.
602                exitcodes.append(-signal.SIGKILL)
603            for p in procs:
604                self.assertIn(p.exitcode, exitcodes)
605
606    def test_lose_target_ref(self):
607        c = DummyCallable()
608        wr = weakref.ref(c)
609        q = self.Queue()
610        p = self.Process(target=c, args=(q, c))
611        del c
612        p.start()
613        p.join()
614        gc.collect()  # For PyPy or other GCs.
615        self.assertIs(wr(), None)
616        self.assertEqual(q.get(), 5)
617        close_queue(q)
618
619    @classmethod
620    def _test_child_fd_inflation(self, evt, q):
621        q.put(os_helper.fd_count())
622        evt.wait()
623
624    def test_child_fd_inflation(self):
625        # Number of fds in child processes should not grow with the
626        # number of running children.
627        if self.TYPE == 'threads':
628            self.skipTest('test not appropriate for {}'.format(self.TYPE))
629
630        sm = multiprocessing.get_start_method()
631        if sm == 'fork':
632            # The fork method by design inherits all fds from the parent,
633            # trying to go against it is a lost battle
634            self.skipTest('test not appropriate for {}'.format(sm))
635
636        N = 5
637        evt = self.Event()
638        q = self.Queue()
639
640        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
641                 for i in range(N)]
642        for p in procs:
643            p.start()
644
645        try:
646            fd_counts = [q.get() for i in range(N)]
647            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
648
649        finally:
650            evt.set()
651            for p in procs:
652                p.join()
653            close_queue(q)
654
655    @classmethod
656    def _test_wait_for_threads(self, evt):
657        def func1():
658            time.sleep(0.5)
659            evt.set()
660
661        def func2():
662            time.sleep(20)
663            evt.clear()
664
665        threading.Thread(target=func1).start()
666        threading.Thread(target=func2, daemon=True).start()
667
668    def test_wait_for_threads(self):
669        # A child process should wait for non-daemonic threads to end
670        # before exiting
671        if self.TYPE == 'threads':
672            self.skipTest('test not appropriate for {}'.format(self.TYPE))
673
674        evt = self.Event()
675        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
676        proc.start()
677        proc.join()
678        self.assertTrue(evt.is_set())
679
680    @classmethod
681    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
682        for stream_name, action in break_std_streams.items():
683            if action == 'close':
684                stream = io.StringIO()
685                stream.close()
686            else:
687                assert action == 'remove'
688                stream = None
689            setattr(sys, stream_name, None)
690        evt.set()
691
692    def test_error_on_stdio_flush_1(self):
693        # Check that Process works with broken standard streams
694        streams = [io.StringIO(), None]
695        streams[0].close()
696        for stream_name in ('stdout', 'stderr'):
697            for stream in streams:
698                old_stream = getattr(sys, stream_name)
699                setattr(sys, stream_name, stream)
700                try:
701                    evt = self.Event()
702                    proc = self.Process(target=self._test_error_on_stdio_flush,
703                                        args=(evt,))
704                    proc.start()
705                    proc.join()
706                    self.assertTrue(evt.is_set())
707                    self.assertEqual(proc.exitcode, 0)
708                finally:
709                    setattr(sys, stream_name, old_stream)
710
711    def test_error_on_stdio_flush_2(self):
712        # Same as test_error_on_stdio_flush_1(), but standard streams are
713        # broken by the child process
714        for stream_name in ('stdout', 'stderr'):
715            for action in ('close', 'remove'):
716                old_stream = getattr(sys, stream_name)
717                try:
718                    evt = self.Event()
719                    proc = self.Process(target=self._test_error_on_stdio_flush,
720                                        args=(evt, {stream_name: action}))
721                    proc.start()
722                    proc.join()
723                    self.assertTrue(evt.is_set())
724                    self.assertEqual(proc.exitcode, 0)
725                finally:
726                    setattr(sys, stream_name, old_stream)
727
728    @classmethod
729    def _sleep_and_set_event(self, evt, delay=0.0):
730        time.sleep(delay)
731        evt.set()
732
733    def check_forkserver_death(self, signum):
734        # bpo-31308: if the forkserver process has died, we should still
735        # be able to create and run new Process instances (the forkserver
736        # is implicitly restarted).
737        if self.TYPE == 'threads':
738            self.skipTest('test not appropriate for {}'.format(self.TYPE))
739        sm = multiprocessing.get_start_method()
740        if sm != 'forkserver':
741            # The fork method by design inherits all fds from the parent,
742            # trying to go against it is a lost battle
743            self.skipTest('test not appropriate for {}'.format(sm))
744
745        from multiprocessing.forkserver import _forkserver
746        _forkserver.ensure_running()
747
748        # First process sleeps 500 ms
749        delay = 0.5
750
751        evt = self.Event()
752        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
753        proc.start()
754
755        pid = _forkserver._forkserver_pid
756        os.kill(pid, signum)
757        # give time to the fork server to die and time to proc to complete
758        time.sleep(delay * 2.0)
759
760        evt2 = self.Event()
761        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
762        proc2.start()
763        proc2.join()
764        self.assertTrue(evt2.is_set())
765        self.assertEqual(proc2.exitcode, 0)
766
767        proc.join()
768        self.assertTrue(evt.is_set())
769        self.assertIn(proc.exitcode, (0, 255))
770
771    def test_forkserver_sigint(self):
772        # Catchable signal
773        self.check_forkserver_death(signal.SIGINT)
774
775    def test_forkserver_sigkill(self):
776        # Uncatchable signal
777        if os.name != 'nt':
778            self.check_forkserver_death(signal.SIGKILL)
779
780
781#
782#
783#
784
785class _UpperCaser(multiprocessing.Process):
786
787    def __init__(self):
788        multiprocessing.Process.__init__(self)
789        self.child_conn, self.parent_conn = multiprocessing.Pipe()
790
791    def run(self):
792        self.parent_conn.close()
793        for s in iter(self.child_conn.recv, None):
794            self.child_conn.send(s.upper())
795        self.child_conn.close()
796
797    def submit(self, s):
798        assert type(s) is str
799        self.parent_conn.send(s)
800        return self.parent_conn.recv()
801
802    def stop(self):
803        self.parent_conn.send(None)
804        self.parent_conn.close()
805        self.child_conn.close()
806
807class _TestSubclassingProcess(BaseTestCase):
808
809    ALLOWED_TYPES = ('processes',)
810
811    def test_subclassing(self):
812        uppercaser = _UpperCaser()
813        uppercaser.daemon = True
814        uppercaser.start()
815        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
816        self.assertEqual(uppercaser.submit('world'), 'WORLD')
817        uppercaser.stop()
818        uppercaser.join()
819
820    def test_stderr_flush(self):
821        # sys.stderr is flushed at process shutdown (issue #13812)
822        if self.TYPE == "threads":
823            self.skipTest('test not appropriate for {}'.format(self.TYPE))
824
825        testfn = os_helper.TESTFN
826        self.addCleanup(os_helper.unlink, testfn)
827        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
828        proc.start()
829        proc.join()
830        with open(testfn, encoding="utf-8") as f:
831            err = f.read()
832            # The whole traceback was printed
833            self.assertIn("ZeroDivisionError", err)
834            self.assertIn("test_multiprocessing.py", err)
835            self.assertIn("1/0 # MARKER", err)
836
837    @classmethod
838    def _test_stderr_flush(cls, testfn):
839        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
840        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
841        1/0 # MARKER
842
843
844    @classmethod
845    def _test_sys_exit(cls, reason, testfn):
846        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
847        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
848        sys.exit(reason)
849
850    def test_sys_exit(self):
851        # See Issue 13854
852        if self.TYPE == 'threads':
853            self.skipTest('test not appropriate for {}'.format(self.TYPE))
854
855        testfn = os_helper.TESTFN
856        self.addCleanup(os_helper.unlink, testfn)
857
858        for reason in (
859            [1, 2, 3],
860            'ignore this',
861        ):
862            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
863            p.daemon = True
864            p.start()
865            join_process(p)
866            self.assertEqual(p.exitcode, 1)
867
868            with open(testfn, encoding="utf-8") as f:
869                content = f.read()
870            self.assertEqual(content.rstrip(), str(reason))
871
872            os.unlink(testfn)
873
874        cases = [
875            ((True,), 1),
876            ((False,), 0),
877            ((8,), 8),
878            ((None,), 0),
879            ((), 0),
880            ]
881
882        for args, expected in cases:
883            with self.subTest(args=args):
884                p = self.Process(target=sys.exit, args=args)
885                p.daemon = True
886                p.start()
887                join_process(p)
888                self.assertEqual(p.exitcode, expected)
889
890#
891#
892#
893
894def queue_empty(q):
895    if hasattr(q, 'empty'):
896        return q.empty()
897    else:
898        return q.qsize() == 0
899
900def queue_full(q, maxsize):
901    if hasattr(q, 'full'):
902        return q.full()
903    else:
904        return q.qsize() == maxsize
905
906
907class _TestQueue(BaseTestCase):
908
909
910    @classmethod
911    def _test_put(cls, queue, child_can_start, parent_can_continue):
912        child_can_start.wait()
913        for i in range(6):
914            queue.get()
915        parent_can_continue.set()
916
917    def test_put(self):
918        MAXSIZE = 6
919        queue = self.Queue(maxsize=MAXSIZE)
920        child_can_start = self.Event()
921        parent_can_continue = self.Event()
922
923        proc = self.Process(
924            target=self._test_put,
925            args=(queue, child_can_start, parent_can_continue)
926            )
927        proc.daemon = True
928        proc.start()
929
930        self.assertEqual(queue_empty(queue), True)
931        self.assertEqual(queue_full(queue, MAXSIZE), False)
932
933        queue.put(1)
934        queue.put(2, True)
935        queue.put(3, True, None)
936        queue.put(4, False)
937        queue.put(5, False, None)
938        queue.put_nowait(6)
939
940        # the values may be in buffer but not yet in pipe so sleep a bit
941        time.sleep(DELTA)
942
943        self.assertEqual(queue_empty(queue), False)
944        self.assertEqual(queue_full(queue, MAXSIZE), True)
945
946        put = TimingWrapper(queue.put)
947        put_nowait = TimingWrapper(queue.put_nowait)
948
949        self.assertRaises(pyqueue.Full, put, 7, False)
950        self.assertTimingAlmostEqual(put.elapsed, 0)
951
952        self.assertRaises(pyqueue.Full, put, 7, False, None)
953        self.assertTimingAlmostEqual(put.elapsed, 0)
954
955        self.assertRaises(pyqueue.Full, put_nowait, 7)
956        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
957
958        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
959        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
960
961        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
962        self.assertTimingAlmostEqual(put.elapsed, 0)
963
964        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
965        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
966
967        child_can_start.set()
968        parent_can_continue.wait()
969
970        self.assertEqual(queue_empty(queue), True)
971        self.assertEqual(queue_full(queue, MAXSIZE), False)
972
973        proc.join()
974        close_queue(queue)
975
976    @classmethod
977    def _test_get(cls, queue, child_can_start, parent_can_continue):
978        child_can_start.wait()
979        #queue.put(1)
980        queue.put(2)
981        queue.put(3)
982        queue.put(4)
983        queue.put(5)
984        parent_can_continue.set()
985
986    def test_get(self):
987        queue = self.Queue()
988        child_can_start = self.Event()
989        parent_can_continue = self.Event()
990
991        proc = self.Process(
992            target=self._test_get,
993            args=(queue, child_can_start, parent_can_continue)
994            )
995        proc.daemon = True
996        proc.start()
997
998        self.assertEqual(queue_empty(queue), True)
999
1000        child_can_start.set()
1001        parent_can_continue.wait()
1002
1003        time.sleep(DELTA)
1004        self.assertEqual(queue_empty(queue), False)
1005
1006        # Hangs unexpectedly, remove for now
1007        #self.assertEqual(queue.get(), 1)
1008        self.assertEqual(queue.get(True, None), 2)
1009        self.assertEqual(queue.get(True), 3)
1010        self.assertEqual(queue.get(timeout=1), 4)
1011        self.assertEqual(queue.get_nowait(), 5)
1012
1013        self.assertEqual(queue_empty(queue), True)
1014
1015        get = TimingWrapper(queue.get)
1016        get_nowait = TimingWrapper(queue.get_nowait)
1017
1018        self.assertRaises(pyqueue.Empty, get, False)
1019        self.assertTimingAlmostEqual(get.elapsed, 0)
1020
1021        self.assertRaises(pyqueue.Empty, get, False, None)
1022        self.assertTimingAlmostEqual(get.elapsed, 0)
1023
1024        self.assertRaises(pyqueue.Empty, get_nowait)
1025        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
1026
1027        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
1028        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1029
1030        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
1031        self.assertTimingAlmostEqual(get.elapsed, 0)
1032
1033        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1034        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1035
1036        proc.join()
1037        close_queue(queue)
1038
1039    @classmethod
1040    def _test_fork(cls, queue):
1041        for i in range(10, 20):
1042            queue.put(i)
1043        # note that at this point the items may only be buffered, so the
1044        # process cannot shutdown until the feeder thread has finished
1045        # pushing items onto the pipe.
1046
1047    def test_fork(self):
1048        # Old versions of Queue would fail to create a new feeder
1049        # thread for a forked process if the original process had its
1050        # own feeder thread.  This test checks that this no longer
1051        # happens.
1052
1053        queue = self.Queue()
1054
1055        # put items on queue so that main process starts a feeder thread
1056        for i in range(10):
1057            queue.put(i)
1058
1059        # wait to make sure thread starts before we fork a new process
1060        time.sleep(DELTA)
1061
1062        # fork process
1063        p = self.Process(target=self._test_fork, args=(queue,))
1064        p.daemon = True
1065        p.start()
1066
1067        # check that all expected items are in the queue
1068        for i in range(20):
1069            self.assertEqual(queue.get(), i)
1070        self.assertRaises(pyqueue.Empty, queue.get, False)
1071
1072        p.join()
1073        close_queue(queue)
1074
1075    def test_qsize(self):
1076        q = self.Queue()
1077        try:
1078            self.assertEqual(q.qsize(), 0)
1079        except NotImplementedError:
1080            self.skipTest('qsize method not implemented')
1081        q.put(1)
1082        self.assertEqual(q.qsize(), 1)
1083        q.put(5)
1084        self.assertEqual(q.qsize(), 2)
1085        q.get()
1086        self.assertEqual(q.qsize(), 1)
1087        q.get()
1088        self.assertEqual(q.qsize(), 0)
1089        close_queue(q)
1090
1091    @classmethod
1092    def _test_task_done(cls, q):
1093        for obj in iter(q.get, None):
1094            time.sleep(DELTA)
1095            q.task_done()
1096
1097    def test_task_done(self):
1098        queue = self.JoinableQueue()
1099
1100        workers = [self.Process(target=self._test_task_done, args=(queue,))
1101                   for i in range(4)]
1102
1103        for p in workers:
1104            p.daemon = True
1105            p.start()
1106
1107        for i in range(10):
1108            queue.put(i)
1109
1110        queue.join()
1111
1112        for p in workers:
1113            queue.put(None)
1114
1115        for p in workers:
1116            p.join()
1117        close_queue(queue)
1118
1119    def test_no_import_lock_contention(self):
1120        with os_helper.temp_cwd():
1121            module_name = 'imported_by_an_imported_module'
1122            with open(module_name + '.py', 'w', encoding="utf-8") as f:
1123                f.write("""if 1:
1124                    import multiprocessing
1125
1126                    q = multiprocessing.Queue()
1127                    q.put('knock knock')
1128                    q.get(timeout=3)
1129                    q.close()
1130                    del q
1131                """)
1132
1133            with import_helper.DirsOnSysPath(os.getcwd()):
1134                try:
1135                    __import__(module_name)
1136                except pyqueue.Empty:
1137                    self.fail("Probable regression on import lock contention;"
1138                              " see Issue #22853")
1139
1140    def test_timeout(self):
1141        q = multiprocessing.Queue()
1142        start = time.monotonic()
1143        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1144        delta = time.monotonic() - start
1145        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1146        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1147        # failed because the delta was only 135.8 ms.
1148        self.assertGreaterEqual(delta, 0.100)
1149        close_queue(q)
1150
1151    def test_queue_feeder_donot_stop_onexc(self):
1152        # bpo-30414: verify feeder handles exceptions correctly
1153        if self.TYPE != 'processes':
1154            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1155
1156        class NotSerializable(object):
1157            def __reduce__(self):
1158                raise AttributeError
1159        with test.support.captured_stderr():
1160            q = self.Queue()
1161            q.put(NotSerializable())
1162            q.put(True)
1163            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1164            close_queue(q)
1165
1166        with test.support.captured_stderr():
1167            # bpo-33078: verify that the queue size is correctly handled
1168            # on errors.
1169            q = self.Queue(maxsize=1)
1170            q.put(NotSerializable())
1171            q.put(True)
1172            try:
1173                self.assertEqual(q.qsize(), 1)
1174            except NotImplementedError:
1175                # qsize is not available on all platform as it
1176                # relies on sem_getvalue
1177                pass
1178            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1179            # Check that the size of the queue is correct
1180            self.assertTrue(q.empty())
1181            close_queue(q)
1182
1183    def test_queue_feeder_on_queue_feeder_error(self):
1184        # bpo-30006: verify feeder handles exceptions using the
1185        # _on_queue_feeder_error hook.
1186        if self.TYPE != 'processes':
1187            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1188
1189        class NotSerializable(object):
1190            """Mock unserializable object"""
1191            def __init__(self):
1192                self.reduce_was_called = False
1193                self.on_queue_feeder_error_was_called = False
1194
1195            def __reduce__(self):
1196                self.reduce_was_called = True
1197                raise AttributeError
1198
1199        class SafeQueue(multiprocessing.queues.Queue):
1200            """Queue with overloaded _on_queue_feeder_error hook"""
1201            @staticmethod
1202            def _on_queue_feeder_error(e, obj):
1203                if (isinstance(e, AttributeError) and
1204                        isinstance(obj, NotSerializable)):
1205                    obj.on_queue_feeder_error_was_called = True
1206
1207        not_serializable_obj = NotSerializable()
1208        # The captured_stderr reduces the noise in the test report
1209        with test.support.captured_stderr():
1210            q = SafeQueue(ctx=multiprocessing.get_context())
1211            q.put(not_serializable_obj)
1212
1213            # Verify that q is still functioning correctly
1214            q.put(True)
1215            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1216
1217        # Assert that the serialization and the hook have been called correctly
1218        self.assertTrue(not_serializable_obj.reduce_was_called)
1219        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1220
1221    def test_closed_queue_put_get_exceptions(self):
1222        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1223            q.close()
1224            with self.assertRaisesRegex(ValueError, 'is closed'):
1225                q.put('foo')
1226            with self.assertRaisesRegex(ValueError, 'is closed'):
1227                q.get()
1228#
1229#
1230#
1231
1232class _TestLock(BaseTestCase):
1233
1234    def test_lock(self):
1235        lock = self.Lock()
1236        self.assertEqual(lock.acquire(), True)
1237        self.assertEqual(lock.acquire(False), False)
1238        self.assertEqual(lock.release(), None)
1239        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1240
1241    def test_rlock(self):
1242        lock = self.RLock()
1243        self.assertEqual(lock.acquire(), True)
1244        self.assertEqual(lock.acquire(), True)
1245        self.assertEqual(lock.acquire(), True)
1246        self.assertEqual(lock.release(), None)
1247        self.assertEqual(lock.release(), None)
1248        self.assertEqual(lock.release(), None)
1249        self.assertRaises((AssertionError, RuntimeError), lock.release)
1250
1251    def test_lock_context(self):
1252        with self.Lock():
1253            pass
1254
1255
1256class _TestSemaphore(BaseTestCase):
1257
1258    def _test_semaphore(self, sem):
1259        self.assertReturnsIfImplemented(2, get_value, sem)
1260        self.assertEqual(sem.acquire(), True)
1261        self.assertReturnsIfImplemented(1, get_value, sem)
1262        self.assertEqual(sem.acquire(), True)
1263        self.assertReturnsIfImplemented(0, get_value, sem)
1264        self.assertEqual(sem.acquire(False), False)
1265        self.assertReturnsIfImplemented(0, get_value, sem)
1266        self.assertEqual(sem.release(), None)
1267        self.assertReturnsIfImplemented(1, get_value, sem)
1268        self.assertEqual(sem.release(), None)
1269        self.assertReturnsIfImplemented(2, get_value, sem)
1270
1271    def test_semaphore(self):
1272        sem = self.Semaphore(2)
1273        self._test_semaphore(sem)
1274        self.assertEqual(sem.release(), None)
1275        self.assertReturnsIfImplemented(3, get_value, sem)
1276        self.assertEqual(sem.release(), None)
1277        self.assertReturnsIfImplemented(4, get_value, sem)
1278
1279    def test_bounded_semaphore(self):
1280        sem = self.BoundedSemaphore(2)
1281        self._test_semaphore(sem)
1282        # Currently fails on OS/X
1283        #if HAVE_GETVALUE:
1284        #    self.assertRaises(ValueError, sem.release)
1285        #    self.assertReturnsIfImplemented(2, get_value, sem)
1286
1287    def test_timeout(self):
1288        if self.TYPE != 'processes':
1289            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1290
1291        sem = self.Semaphore(0)
1292        acquire = TimingWrapper(sem.acquire)
1293
1294        self.assertEqual(acquire(False), False)
1295        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1296
1297        self.assertEqual(acquire(False, None), False)
1298        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1299
1300        self.assertEqual(acquire(False, TIMEOUT1), False)
1301        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1302
1303        self.assertEqual(acquire(True, TIMEOUT2), False)
1304        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1305
1306        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1307        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1308
1309
1310class _TestCondition(BaseTestCase):
1311
1312    @classmethod
1313    def f(cls, cond, sleeping, woken, timeout=None):
1314        cond.acquire()
1315        sleeping.release()
1316        cond.wait(timeout)
1317        woken.release()
1318        cond.release()
1319
1320    def assertReachesEventually(self, func, value):
1321        for i in range(10):
1322            try:
1323                if func() == value:
1324                    break
1325            except NotImplementedError:
1326                break
1327            time.sleep(DELTA)
1328        time.sleep(DELTA)
1329        self.assertReturnsIfImplemented(value, func)
1330
1331    def check_invariant(self, cond):
1332        # this is only supposed to succeed when there are no sleepers
1333        if self.TYPE == 'processes':
1334            try:
1335                sleepers = (cond._sleeping_count.get_value() -
1336                            cond._woken_count.get_value())
1337                self.assertEqual(sleepers, 0)
1338                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1339            except NotImplementedError:
1340                pass
1341
1342    def test_notify(self):
1343        cond = self.Condition()
1344        sleeping = self.Semaphore(0)
1345        woken = self.Semaphore(0)
1346
1347        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1348        p.daemon = True
1349        p.start()
1350        self.addCleanup(p.join)
1351
1352        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1353        p.daemon = True
1354        p.start()
1355        self.addCleanup(p.join)
1356
1357        # wait for both children to start sleeping
1358        sleeping.acquire()
1359        sleeping.acquire()
1360
1361        # check no process/thread has woken up
1362        time.sleep(DELTA)
1363        self.assertReturnsIfImplemented(0, get_value, woken)
1364
1365        # wake up one process/thread
1366        cond.acquire()
1367        cond.notify()
1368        cond.release()
1369
1370        # check one process/thread has woken up
1371        time.sleep(DELTA)
1372        self.assertReturnsIfImplemented(1, get_value, woken)
1373
1374        # wake up another
1375        cond.acquire()
1376        cond.notify()
1377        cond.release()
1378
1379        # check other has woken up
1380        time.sleep(DELTA)
1381        self.assertReturnsIfImplemented(2, get_value, woken)
1382
1383        # check state is not mucked up
1384        self.check_invariant(cond)
1385        p.join()
1386
1387    def test_notify_all(self):
1388        cond = self.Condition()
1389        sleeping = self.Semaphore(0)
1390        woken = self.Semaphore(0)
1391
1392        # start some threads/processes which will timeout
1393        for i in range(3):
1394            p = self.Process(target=self.f,
1395                             args=(cond, sleeping, woken, TIMEOUT1))
1396            p.daemon = True
1397            p.start()
1398            self.addCleanup(p.join)
1399
1400            t = threading.Thread(target=self.f,
1401                                 args=(cond, sleeping, woken, TIMEOUT1))
1402            t.daemon = True
1403            t.start()
1404            self.addCleanup(t.join)
1405
1406        # wait for them all to sleep
1407        for i in range(6):
1408            sleeping.acquire()
1409
1410        # check they have all timed out
1411        for i in range(6):
1412            woken.acquire()
1413        self.assertReturnsIfImplemented(0, get_value, woken)
1414
1415        # check state is not mucked up
1416        self.check_invariant(cond)
1417
1418        # start some more threads/processes
1419        for i in range(3):
1420            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1421            p.daemon = True
1422            p.start()
1423            self.addCleanup(p.join)
1424
1425            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1426            t.daemon = True
1427            t.start()
1428            self.addCleanup(t.join)
1429
1430        # wait for them to all sleep
1431        for i in range(6):
1432            sleeping.acquire()
1433
1434        # check no process/thread has woken up
1435        time.sleep(DELTA)
1436        self.assertReturnsIfImplemented(0, get_value, woken)
1437
1438        # wake them all up
1439        cond.acquire()
1440        cond.notify_all()
1441        cond.release()
1442
1443        # check they have all woken
1444        self.assertReachesEventually(lambda: get_value(woken), 6)
1445
1446        # check state is not mucked up
1447        self.check_invariant(cond)
1448
1449    def test_notify_n(self):
1450        cond = self.Condition()
1451        sleeping = self.Semaphore(0)
1452        woken = self.Semaphore(0)
1453
1454        # start some threads/processes
1455        for i in range(3):
1456            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1457            p.daemon = True
1458            p.start()
1459            self.addCleanup(p.join)
1460
1461            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1462            t.daemon = True
1463            t.start()
1464            self.addCleanup(t.join)
1465
1466        # wait for them to all sleep
1467        for i in range(6):
1468            sleeping.acquire()
1469
1470        # check no process/thread has woken up
1471        time.sleep(DELTA)
1472        self.assertReturnsIfImplemented(0, get_value, woken)
1473
1474        # wake some of them up
1475        cond.acquire()
1476        cond.notify(n=2)
1477        cond.release()
1478
1479        # check 2 have woken
1480        self.assertReachesEventually(lambda: get_value(woken), 2)
1481
1482        # wake the rest of them
1483        cond.acquire()
1484        cond.notify(n=4)
1485        cond.release()
1486
1487        self.assertReachesEventually(lambda: get_value(woken), 6)
1488
1489        # doesn't do anything more
1490        cond.acquire()
1491        cond.notify(n=3)
1492        cond.release()
1493
1494        self.assertReturnsIfImplemented(6, get_value, woken)
1495
1496        # check state is not mucked up
1497        self.check_invariant(cond)
1498
1499    def test_timeout(self):
1500        cond = self.Condition()
1501        wait = TimingWrapper(cond.wait)
1502        cond.acquire()
1503        res = wait(TIMEOUT1)
1504        cond.release()
1505        self.assertEqual(res, False)
1506        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1507
1508    @classmethod
1509    def _test_waitfor_f(cls, cond, state):
1510        with cond:
1511            state.value = 0
1512            cond.notify()
1513            result = cond.wait_for(lambda : state.value==4)
1514            if not result or state.value != 4:
1515                sys.exit(1)
1516
1517    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1518    def test_waitfor(self):
1519        # based on test in test/lock_tests.py
1520        cond = self.Condition()
1521        state = self.Value('i', -1)
1522
1523        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1524        p.daemon = True
1525        p.start()
1526
1527        with cond:
1528            result = cond.wait_for(lambda : state.value==0)
1529            self.assertTrue(result)
1530            self.assertEqual(state.value, 0)
1531
1532        for i in range(4):
1533            time.sleep(0.01)
1534            with cond:
1535                state.value += 1
1536                cond.notify()
1537
1538        join_process(p)
1539        self.assertEqual(p.exitcode, 0)
1540
1541    @classmethod
1542    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1543        sem.release()
1544        with cond:
1545            expected = 0.1
1546            dt = time.monotonic()
1547            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1548            dt = time.monotonic() - dt
1549            # borrow logic in assertTimeout() from test/lock_tests.py
1550            if not result and expected * 0.6 < dt < expected * 10.0:
1551                success.value = True
1552
1553    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1554    def test_waitfor_timeout(self):
1555        # based on test in test/lock_tests.py
1556        cond = self.Condition()
1557        state = self.Value('i', 0)
1558        success = self.Value('i', False)
1559        sem = self.Semaphore(0)
1560
1561        p = self.Process(target=self._test_waitfor_timeout_f,
1562                         args=(cond, state, success, sem))
1563        p.daemon = True
1564        p.start()
1565        self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT))
1566
1567        # Only increment 3 times, so state == 4 is never reached.
1568        for i in range(3):
1569            time.sleep(0.01)
1570            with cond:
1571                state.value += 1
1572                cond.notify()
1573
1574        join_process(p)
1575        self.assertTrue(success.value)
1576
1577    @classmethod
1578    def _test_wait_result(cls, c, pid):
1579        with c:
1580            c.notify()
1581        time.sleep(1)
1582        if pid is not None:
1583            os.kill(pid, signal.SIGINT)
1584
1585    def test_wait_result(self):
1586        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1587            pid = os.getpid()
1588        else:
1589            pid = None
1590
1591        c = self.Condition()
1592        with c:
1593            self.assertFalse(c.wait(0))
1594            self.assertFalse(c.wait(0.1))
1595
1596            p = self.Process(target=self._test_wait_result, args=(c, pid))
1597            p.start()
1598
1599            self.assertTrue(c.wait(60))
1600            if pid is not None:
1601                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1602
1603            p.join()
1604
1605
1606class _TestEvent(BaseTestCase):
1607
1608    @classmethod
1609    def _test_event(cls, event):
1610        time.sleep(TIMEOUT2)
1611        event.set()
1612
1613    def test_event(self):
1614        event = self.Event()
1615        wait = TimingWrapper(event.wait)
1616
1617        # Removed temporarily, due to API shear, this does not
1618        # work with threading._Event objects. is_set == isSet
1619        self.assertEqual(event.is_set(), False)
1620
1621        # Removed, threading.Event.wait() will return the value of the __flag
1622        # instead of None. API Shear with the semaphore backed mp.Event
1623        self.assertEqual(wait(0.0), False)
1624        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1625        self.assertEqual(wait(TIMEOUT1), False)
1626        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1627
1628        event.set()
1629
1630        # See note above on the API differences
1631        self.assertEqual(event.is_set(), True)
1632        self.assertEqual(wait(), True)
1633        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1634        self.assertEqual(wait(TIMEOUT1), True)
1635        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1636        # self.assertEqual(event.is_set(), True)
1637
1638        event.clear()
1639
1640        #self.assertEqual(event.is_set(), False)
1641
1642        p = self.Process(target=self._test_event, args=(event,))
1643        p.daemon = True
1644        p.start()
1645        self.assertEqual(wait(), True)
1646        p.join()
1647
1648#
1649# Tests for Barrier - adapted from tests in test/lock_tests.py
1650#
1651
1652# Many of the tests for threading.Barrier use a list as an atomic
1653# counter: a value is appended to increment the counter, and the
1654# length of the list gives the value.  We use the class DummyList
1655# for the same purpose.
1656
1657class _DummyList(object):
1658
1659    def __init__(self):
1660        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1661        lock = multiprocessing.Lock()
1662        self.__setstate__((wrapper, lock))
1663        self._lengthbuf[0] = 0
1664
1665    def __setstate__(self, state):
1666        (self._wrapper, self._lock) = state
1667        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1668
1669    def __getstate__(self):
1670        return (self._wrapper, self._lock)
1671
1672    def append(self, _):
1673        with self._lock:
1674            self._lengthbuf[0] += 1
1675
1676    def __len__(self):
1677        with self._lock:
1678            return self._lengthbuf[0]
1679
1680def _wait():
1681    # A crude wait/yield function not relying on synchronization primitives.
1682    time.sleep(0.01)
1683
1684
1685class Bunch(object):
1686    """
1687    A bunch of threads.
1688    """
1689    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1690        """
1691        Construct a bunch of `n` threads running the same function `f`.
1692        If `wait_before_exit` is True, the threads won't terminate until
1693        do_finish() is called.
1694        """
1695        self.f = f
1696        self.args = args
1697        self.n = n
1698        self.started = namespace.DummyList()
1699        self.finished = namespace.DummyList()
1700        self._can_exit = namespace.Event()
1701        if not wait_before_exit:
1702            self._can_exit.set()
1703
1704        threads = []
1705        for i in range(n):
1706            p = namespace.Process(target=self.task)
1707            p.daemon = True
1708            p.start()
1709            threads.append(p)
1710
1711        def finalize(threads):
1712            for p in threads:
1713                p.join()
1714
1715        self._finalizer = weakref.finalize(self, finalize, threads)
1716
1717    def task(self):
1718        pid = os.getpid()
1719        self.started.append(pid)
1720        try:
1721            self.f(*self.args)
1722        finally:
1723            self.finished.append(pid)
1724            self._can_exit.wait(30)
1725            assert self._can_exit.is_set()
1726
1727    def wait_for_started(self):
1728        while len(self.started) < self.n:
1729            _wait()
1730
1731    def wait_for_finished(self):
1732        while len(self.finished) < self.n:
1733            _wait()
1734
1735    def do_finish(self):
1736        self._can_exit.set()
1737
1738    def close(self):
1739        self._finalizer()
1740
1741
1742class AppendTrue(object):
1743    def __init__(self, obj):
1744        self.obj = obj
1745    def __call__(self):
1746        self.obj.append(True)
1747
1748
1749class _TestBarrier(BaseTestCase):
1750    """
1751    Tests for Barrier objects.
1752    """
1753    N = 5
1754    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1755
1756    def setUp(self):
1757        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1758
1759    def tearDown(self):
1760        self.barrier.abort()
1761        self.barrier = None
1762
1763    def DummyList(self):
1764        if self.TYPE == 'threads':
1765            return []
1766        elif self.TYPE == 'manager':
1767            return self.manager.list()
1768        else:
1769            return _DummyList()
1770
1771    def run_threads(self, f, args):
1772        b = Bunch(self, f, args, self.N-1)
1773        try:
1774            f(*args)
1775            b.wait_for_finished()
1776        finally:
1777            b.close()
1778
1779    @classmethod
1780    def multipass(cls, barrier, results, n):
1781        m = barrier.parties
1782        assert m == cls.N
1783        for i in range(n):
1784            results[0].append(True)
1785            assert len(results[1]) == i * m
1786            barrier.wait()
1787            results[1].append(True)
1788            assert len(results[0]) == (i + 1) * m
1789            barrier.wait()
1790        try:
1791            assert barrier.n_waiting == 0
1792        except NotImplementedError:
1793            pass
1794        assert not barrier.broken
1795
1796    def test_barrier(self, passes=1):
1797        """
1798        Test that a barrier is passed in lockstep
1799        """
1800        results = [self.DummyList(), self.DummyList()]
1801        self.run_threads(self.multipass, (self.barrier, results, passes))
1802
1803    def test_barrier_10(self):
1804        """
1805        Test that a barrier works for 10 consecutive runs
1806        """
1807        return self.test_barrier(10)
1808
1809    @classmethod
1810    def _test_wait_return_f(cls, barrier, queue):
1811        res = barrier.wait()
1812        queue.put(res)
1813
1814    def test_wait_return(self):
1815        """
1816        test the return value from barrier.wait
1817        """
1818        queue = self.Queue()
1819        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1820        results = [queue.get() for i in range(self.N)]
1821        self.assertEqual(results.count(0), 1)
1822        close_queue(queue)
1823
1824    @classmethod
1825    def _test_action_f(cls, barrier, results):
1826        barrier.wait()
1827        if len(results) != 1:
1828            raise RuntimeError
1829
1830    def test_action(self):
1831        """
1832        Test the 'action' callback
1833        """
1834        results = self.DummyList()
1835        barrier = self.Barrier(self.N, action=AppendTrue(results))
1836        self.run_threads(self._test_action_f, (barrier, results))
1837        self.assertEqual(len(results), 1)
1838
1839    @classmethod
1840    def _test_abort_f(cls, barrier, results1, results2):
1841        try:
1842            i = barrier.wait()
1843            if i == cls.N//2:
1844                raise RuntimeError
1845            barrier.wait()
1846            results1.append(True)
1847        except threading.BrokenBarrierError:
1848            results2.append(True)
1849        except RuntimeError:
1850            barrier.abort()
1851
1852    def test_abort(self):
1853        """
1854        Test that an abort will put the barrier in a broken state
1855        """
1856        results1 = self.DummyList()
1857        results2 = self.DummyList()
1858        self.run_threads(self._test_abort_f,
1859                         (self.barrier, results1, results2))
1860        self.assertEqual(len(results1), 0)
1861        self.assertEqual(len(results2), self.N-1)
1862        self.assertTrue(self.barrier.broken)
1863
1864    @classmethod
1865    def _test_reset_f(cls, barrier, results1, results2, results3):
1866        i = barrier.wait()
1867        if i == cls.N//2:
1868            # Wait until the other threads are all in the barrier.
1869            while barrier.n_waiting < cls.N-1:
1870                time.sleep(0.001)
1871            barrier.reset()
1872        else:
1873            try:
1874                barrier.wait()
1875                results1.append(True)
1876            except threading.BrokenBarrierError:
1877                results2.append(True)
1878        # Now, pass the barrier again
1879        barrier.wait()
1880        results3.append(True)
1881
1882    def test_reset(self):
1883        """
1884        Test that a 'reset' on a barrier frees the waiting threads
1885        """
1886        results1 = self.DummyList()
1887        results2 = self.DummyList()
1888        results3 = self.DummyList()
1889        self.run_threads(self._test_reset_f,
1890                         (self.barrier, results1, results2, results3))
1891        self.assertEqual(len(results1), 0)
1892        self.assertEqual(len(results2), self.N-1)
1893        self.assertEqual(len(results3), self.N)
1894
1895    @classmethod
1896    def _test_abort_and_reset_f(cls, barrier, barrier2,
1897                                results1, results2, results3):
1898        try:
1899            i = barrier.wait()
1900            if i == cls.N//2:
1901                raise RuntimeError
1902            barrier.wait()
1903            results1.append(True)
1904        except threading.BrokenBarrierError:
1905            results2.append(True)
1906        except RuntimeError:
1907            barrier.abort()
1908        # Synchronize and reset the barrier.  Must synchronize first so
1909        # that everyone has left it when we reset, and after so that no
1910        # one enters it before the reset.
1911        if barrier2.wait() == cls.N//2:
1912            barrier.reset()
1913        barrier2.wait()
1914        barrier.wait()
1915        results3.append(True)
1916
1917    def test_abort_and_reset(self):
1918        """
1919        Test that a barrier can be reset after being broken.
1920        """
1921        results1 = self.DummyList()
1922        results2 = self.DummyList()
1923        results3 = self.DummyList()
1924        barrier2 = self.Barrier(self.N)
1925
1926        self.run_threads(self._test_abort_and_reset_f,
1927                         (self.barrier, barrier2, results1, results2, results3))
1928        self.assertEqual(len(results1), 0)
1929        self.assertEqual(len(results2), self.N-1)
1930        self.assertEqual(len(results3), self.N)
1931
1932    @classmethod
1933    def _test_timeout_f(cls, barrier, results):
1934        i = barrier.wait()
1935        if i == cls.N//2:
1936            # One thread is late!
1937            time.sleep(1.0)
1938        try:
1939            barrier.wait(0.5)
1940        except threading.BrokenBarrierError:
1941            results.append(True)
1942
1943    def test_timeout(self):
1944        """
1945        Test wait(timeout)
1946        """
1947        results = self.DummyList()
1948        self.run_threads(self._test_timeout_f, (self.barrier, results))
1949        self.assertEqual(len(results), self.barrier.parties)
1950
1951    @classmethod
1952    def _test_default_timeout_f(cls, barrier, results):
1953        i = barrier.wait(cls.defaultTimeout)
1954        if i == cls.N//2:
1955            # One thread is later than the default timeout
1956            time.sleep(1.0)
1957        try:
1958            barrier.wait()
1959        except threading.BrokenBarrierError:
1960            results.append(True)
1961
1962    def test_default_timeout(self):
1963        """
1964        Test the barrier's default timeout
1965        """
1966        barrier = self.Barrier(self.N, timeout=0.5)
1967        results = self.DummyList()
1968        self.run_threads(self._test_default_timeout_f, (barrier, results))
1969        self.assertEqual(len(results), barrier.parties)
1970
1971    def test_single_thread(self):
1972        b = self.Barrier(1)
1973        b.wait()
1974        b.wait()
1975
1976    @classmethod
1977    def _test_thousand_f(cls, barrier, passes, conn, lock):
1978        for i in range(passes):
1979            barrier.wait()
1980            with lock:
1981                conn.send(i)
1982
1983    def test_thousand(self):
1984        if self.TYPE == 'manager':
1985            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1986        passes = 1000
1987        lock = self.Lock()
1988        conn, child_conn = self.Pipe(False)
1989        for j in range(self.N):
1990            p = self.Process(target=self._test_thousand_f,
1991                           args=(self.barrier, passes, child_conn, lock))
1992            p.start()
1993            self.addCleanup(p.join)
1994
1995        for i in range(passes):
1996            for j in range(self.N):
1997                self.assertEqual(conn.recv(), i)
1998
1999#
2000#
2001#
2002
2003class _TestValue(BaseTestCase):
2004
2005    ALLOWED_TYPES = ('processes',)
2006
2007    codes_values = [
2008        ('i', 4343, 24234),
2009        ('d', 3.625, -4.25),
2010        ('h', -232, 234),
2011        ('q', 2 ** 33, 2 ** 34),
2012        ('c', latin('x'), latin('y'))
2013        ]
2014
2015    def setUp(self):
2016        if not HAS_SHAREDCTYPES:
2017            self.skipTest("requires multiprocessing.sharedctypes")
2018
2019    @classmethod
2020    def _test(cls, values):
2021        for sv, cv in zip(values, cls.codes_values):
2022            sv.value = cv[2]
2023
2024
2025    def test_value(self, raw=False):
2026        if raw:
2027            values = [self.RawValue(code, value)
2028                      for code, value, _ in self.codes_values]
2029        else:
2030            values = [self.Value(code, value)
2031                      for code, value, _ in self.codes_values]
2032
2033        for sv, cv in zip(values, self.codes_values):
2034            self.assertEqual(sv.value, cv[1])
2035
2036        proc = self.Process(target=self._test, args=(values,))
2037        proc.daemon = True
2038        proc.start()
2039        proc.join()
2040
2041        for sv, cv in zip(values, self.codes_values):
2042            self.assertEqual(sv.value, cv[2])
2043
2044    def test_rawvalue(self):
2045        self.test_value(raw=True)
2046
2047    def test_getobj_getlock(self):
2048        val1 = self.Value('i', 5)
2049        lock1 = val1.get_lock()
2050        obj1 = val1.get_obj()
2051
2052        val2 = self.Value('i', 5, lock=None)
2053        lock2 = val2.get_lock()
2054        obj2 = val2.get_obj()
2055
2056        lock = self.Lock()
2057        val3 = self.Value('i', 5, lock=lock)
2058        lock3 = val3.get_lock()
2059        obj3 = val3.get_obj()
2060        self.assertEqual(lock, lock3)
2061
2062        arr4 = self.Value('i', 5, lock=False)
2063        self.assertFalse(hasattr(arr4, 'get_lock'))
2064        self.assertFalse(hasattr(arr4, 'get_obj'))
2065
2066        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2067
2068        arr5 = self.RawValue('i', 5)
2069        self.assertFalse(hasattr(arr5, 'get_lock'))
2070        self.assertFalse(hasattr(arr5, 'get_obj'))
2071
2072
2073class _TestArray(BaseTestCase):
2074
2075    ALLOWED_TYPES = ('processes',)
2076
2077    @classmethod
2078    def f(cls, seq):
2079        for i in range(1, len(seq)):
2080            seq[i] += seq[i-1]
2081
2082    @unittest.skipIf(c_int is None, "requires _ctypes")
2083    def test_array(self, raw=False):
2084        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2085        if raw:
2086            arr = self.RawArray('i', seq)
2087        else:
2088            arr = self.Array('i', seq)
2089
2090        self.assertEqual(len(arr), len(seq))
2091        self.assertEqual(arr[3], seq[3])
2092        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2093
2094        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2095
2096        self.assertEqual(list(arr[:]), seq)
2097
2098        self.f(seq)
2099
2100        p = self.Process(target=self.f, args=(arr,))
2101        p.daemon = True
2102        p.start()
2103        p.join()
2104
2105        self.assertEqual(list(arr[:]), seq)
2106
2107    @unittest.skipIf(c_int is None, "requires _ctypes")
2108    def test_array_from_size(self):
2109        size = 10
2110        # Test for zeroing (see issue #11675).
2111        # The repetition below strengthens the test by increasing the chances
2112        # of previously allocated non-zero memory being used for the new array
2113        # on the 2nd and 3rd loops.
2114        for _ in range(3):
2115            arr = self.Array('i', size)
2116            self.assertEqual(len(arr), size)
2117            self.assertEqual(list(arr), [0] * size)
2118            arr[:] = range(10)
2119            self.assertEqual(list(arr), list(range(10)))
2120            del arr
2121
2122    @unittest.skipIf(c_int is None, "requires _ctypes")
2123    def test_rawarray(self):
2124        self.test_array(raw=True)
2125
2126    @unittest.skipIf(c_int is None, "requires _ctypes")
2127    def test_getobj_getlock_obj(self):
2128        arr1 = self.Array('i', list(range(10)))
2129        lock1 = arr1.get_lock()
2130        obj1 = arr1.get_obj()
2131
2132        arr2 = self.Array('i', list(range(10)), lock=None)
2133        lock2 = arr2.get_lock()
2134        obj2 = arr2.get_obj()
2135
2136        lock = self.Lock()
2137        arr3 = self.Array('i', list(range(10)), lock=lock)
2138        lock3 = arr3.get_lock()
2139        obj3 = arr3.get_obj()
2140        self.assertEqual(lock, lock3)
2141
2142        arr4 = self.Array('i', range(10), lock=False)
2143        self.assertFalse(hasattr(arr4, 'get_lock'))
2144        self.assertFalse(hasattr(arr4, 'get_obj'))
2145        self.assertRaises(AttributeError,
2146                          self.Array, 'i', range(10), lock='notalock')
2147
2148        arr5 = self.RawArray('i', range(10))
2149        self.assertFalse(hasattr(arr5, 'get_lock'))
2150        self.assertFalse(hasattr(arr5, 'get_obj'))
2151
2152#
2153#
2154#
2155
2156class _TestContainers(BaseTestCase):
2157
2158    ALLOWED_TYPES = ('manager',)
2159
2160    def test_list(self):
2161        a = self.list(list(range(10)))
2162        self.assertEqual(a[:], list(range(10)))
2163
2164        b = self.list()
2165        self.assertEqual(b[:], [])
2166
2167        b.extend(list(range(5)))
2168        self.assertEqual(b[:], list(range(5)))
2169
2170        self.assertEqual(b[2], 2)
2171        self.assertEqual(b[2:10], [2,3,4])
2172
2173        b *= 2
2174        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2175
2176        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2177
2178        self.assertEqual(a[:], list(range(10)))
2179
2180        d = [a, b]
2181        e = self.list(d)
2182        self.assertEqual(
2183            [element[:] for element in e],
2184            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2185            )
2186
2187        f = self.list([a])
2188        a.append('hello')
2189        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2190
2191    def test_list_iter(self):
2192        a = self.list(list(range(10)))
2193        it = iter(a)
2194        self.assertEqual(list(it), list(range(10)))
2195        self.assertEqual(list(it), [])  # exhausted
2196        # list modified during iteration
2197        it = iter(a)
2198        a[0] = 100
2199        self.assertEqual(next(it), 100)
2200
2201    def test_list_proxy_in_list(self):
2202        a = self.list([self.list(range(3)) for _i in range(3)])
2203        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2204
2205        a[0][-1] = 55
2206        self.assertEqual(a[0][:], [0, 1, 55])
2207        for i in range(1, 3):
2208            self.assertEqual(a[i][:], [0, 1, 2])
2209
2210        self.assertEqual(a[1].pop(), 2)
2211        self.assertEqual(len(a[1]), 2)
2212        for i in range(0, 3, 2):
2213            self.assertEqual(len(a[i]), 3)
2214
2215        del a
2216
2217        b = self.list()
2218        b.append(b)
2219        del b
2220
2221    def test_dict(self):
2222        d = self.dict()
2223        indices = list(range(65, 70))
2224        for i in indices:
2225            d[i] = chr(i)
2226        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2227        self.assertEqual(sorted(d.keys()), indices)
2228        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2229        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2230
2231    def test_dict_iter(self):
2232        d = self.dict()
2233        indices = list(range(65, 70))
2234        for i in indices:
2235            d[i] = chr(i)
2236        it = iter(d)
2237        self.assertEqual(list(it), indices)
2238        self.assertEqual(list(it), [])  # exhausted
2239        # dictionary changed size during iteration
2240        it = iter(d)
2241        d.clear()
2242        self.assertRaises(RuntimeError, next, it)
2243
2244    def test_dict_proxy_nested(self):
2245        pets = self.dict(ferrets=2, hamsters=4)
2246        supplies = self.dict(water=10, feed=3)
2247        d = self.dict(pets=pets, supplies=supplies)
2248
2249        self.assertEqual(supplies['water'], 10)
2250        self.assertEqual(d['supplies']['water'], 10)
2251
2252        d['supplies']['blankets'] = 5
2253        self.assertEqual(supplies['blankets'], 5)
2254        self.assertEqual(d['supplies']['blankets'], 5)
2255
2256        d['supplies']['water'] = 7
2257        self.assertEqual(supplies['water'], 7)
2258        self.assertEqual(d['supplies']['water'], 7)
2259
2260        del pets
2261        del supplies
2262        self.assertEqual(d['pets']['ferrets'], 2)
2263        d['supplies']['blankets'] = 11
2264        self.assertEqual(d['supplies']['blankets'], 11)
2265
2266        pets = d['pets']
2267        supplies = d['supplies']
2268        supplies['water'] = 7
2269        self.assertEqual(supplies['water'], 7)
2270        self.assertEqual(d['supplies']['water'], 7)
2271
2272        d.clear()
2273        self.assertEqual(len(d), 0)
2274        self.assertEqual(supplies['water'], 7)
2275        self.assertEqual(pets['hamsters'], 4)
2276
2277        l = self.list([pets, supplies])
2278        l[0]['marmots'] = 1
2279        self.assertEqual(pets['marmots'], 1)
2280        self.assertEqual(l[0]['marmots'], 1)
2281
2282        del pets
2283        del supplies
2284        self.assertEqual(l[0]['marmots'], 1)
2285
2286        outer = self.list([[88, 99], l])
2287        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2288        self.assertEqual(outer[-1][-1]['feed'], 3)
2289
2290    def test_nested_queue(self):
2291        a = self.list() # Test queue inside list
2292        a.append(self.Queue())
2293        a[0].put(123)
2294        self.assertEqual(a[0].get(), 123)
2295        b = self.dict() # Test queue inside dict
2296        b[0] = self.Queue()
2297        b[0].put(456)
2298        self.assertEqual(b[0].get(), 456)
2299
2300    def test_namespace(self):
2301        n = self.Namespace()
2302        n.name = 'Bob'
2303        n.job = 'Builder'
2304        n._hidden = 'hidden'
2305        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2306        del n.job
2307        self.assertEqual(str(n), "Namespace(name='Bob')")
2308        self.assertTrue(hasattr(n, 'name'))
2309        self.assertTrue(not hasattr(n, 'job'))
2310
2311#
2312#
2313#
2314
2315def sqr(x, wait=0.0):
2316    time.sleep(wait)
2317    return x*x
2318
2319def mul(x, y):
2320    return x*y
2321
2322def raise_large_valuerror(wait):
2323    time.sleep(wait)
2324    raise ValueError("x" * 1024**2)
2325
2326def identity(x):
2327    return x
2328
2329class CountedObject(object):
2330    n_instances = 0
2331
2332    def __new__(cls):
2333        cls.n_instances += 1
2334        return object.__new__(cls)
2335
2336    def __del__(self):
2337        type(self).n_instances -= 1
2338
2339class SayWhenError(ValueError): pass
2340
2341def exception_throwing_generator(total, when):
2342    if when == -1:
2343        raise SayWhenError("Somebody said when")
2344    for i in range(total):
2345        if i == when:
2346            raise SayWhenError("Somebody said when")
2347        yield i
2348
2349
2350class _TestPool(BaseTestCase):
2351
2352    @classmethod
2353    def setUpClass(cls):
2354        super().setUpClass()
2355        cls.pool = cls.Pool(4)
2356
2357    @classmethod
2358    def tearDownClass(cls):
2359        cls.pool.terminate()
2360        cls.pool.join()
2361        cls.pool = None
2362        super().tearDownClass()
2363
2364    def test_apply(self):
2365        papply = self.pool.apply
2366        self.assertEqual(papply(sqr, (5,)), sqr(5))
2367        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2368
2369    def test_map(self):
2370        pmap = self.pool.map
2371        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2372        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2373                         list(map(sqr, list(range(100)))))
2374
2375    def test_starmap(self):
2376        psmap = self.pool.starmap
2377        tuples = list(zip(range(10), range(9,-1, -1)))
2378        self.assertEqual(psmap(mul, tuples),
2379                         list(itertools.starmap(mul, tuples)))
2380        tuples = list(zip(range(100), range(99,-1, -1)))
2381        self.assertEqual(psmap(mul, tuples, chunksize=20),
2382                         list(itertools.starmap(mul, tuples)))
2383
2384    def test_starmap_async(self):
2385        tuples = list(zip(range(100), range(99,-1, -1)))
2386        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2387                         list(itertools.starmap(mul, tuples)))
2388
2389    def test_map_async(self):
2390        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2391                         list(map(sqr, list(range(10)))))
2392
2393    def test_map_async_callbacks(self):
2394        call_args = self.manager.list() if self.TYPE == 'manager' else []
2395        self.pool.map_async(int, ['1'],
2396                            callback=call_args.append,
2397                            error_callback=call_args.append).wait()
2398        self.assertEqual(1, len(call_args))
2399        self.assertEqual([1], call_args[0])
2400        self.pool.map_async(int, ['a'],
2401                            callback=call_args.append,
2402                            error_callback=call_args.append).wait()
2403        self.assertEqual(2, len(call_args))
2404        self.assertIsInstance(call_args[1], ValueError)
2405
2406    def test_map_unplicklable(self):
2407        # Issue #19425 -- failure to pickle should not cause a hang
2408        if self.TYPE == 'threads':
2409            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2410        class A(object):
2411            def __reduce__(self):
2412                raise RuntimeError('cannot pickle')
2413        with self.assertRaises(RuntimeError):
2414            self.pool.map(sqr, [A()]*10)
2415
2416    def test_map_chunksize(self):
2417        try:
2418            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2419        except multiprocessing.TimeoutError:
2420            self.fail("pool.map_async with chunksize stalled on null list")
2421
2422    def test_map_handle_iterable_exception(self):
2423        if self.TYPE == 'manager':
2424            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2425
2426        # SayWhenError seen at the very first of the iterable
2427        with self.assertRaises(SayWhenError):
2428            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2429        # again, make sure it's reentrant
2430        with self.assertRaises(SayWhenError):
2431            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2432
2433        with self.assertRaises(SayWhenError):
2434            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2435
2436        class SpecialIterable:
2437            def __iter__(self):
2438                return self
2439            def __next__(self):
2440                raise SayWhenError
2441            def __len__(self):
2442                return 1
2443        with self.assertRaises(SayWhenError):
2444            self.pool.map(sqr, SpecialIterable(), 1)
2445        with self.assertRaises(SayWhenError):
2446            self.pool.map(sqr, SpecialIterable(), 1)
2447
2448    def test_async(self):
2449        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2450        get = TimingWrapper(res.get)
2451        self.assertEqual(get(), 49)
2452        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2453
2454    def test_async_timeout(self):
2455        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
2456        get = TimingWrapper(res.get)
2457        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2458        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2459
2460    def test_imap(self):
2461        it = self.pool.imap(sqr, list(range(10)))
2462        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2463
2464        it = self.pool.imap(sqr, list(range(10)))
2465        for i in range(10):
2466            self.assertEqual(next(it), i*i)
2467        self.assertRaises(StopIteration, it.__next__)
2468
2469        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2470        for i in range(1000):
2471            self.assertEqual(next(it), i*i)
2472        self.assertRaises(StopIteration, it.__next__)
2473
2474    def test_imap_handle_iterable_exception(self):
2475        if self.TYPE == 'manager':
2476            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2477
2478        # SayWhenError seen at the very first of the iterable
2479        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2480        self.assertRaises(SayWhenError, it.__next__)
2481        # again, make sure it's reentrant
2482        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2483        self.assertRaises(SayWhenError, it.__next__)
2484
2485        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2486        for i in range(3):
2487            self.assertEqual(next(it), i*i)
2488        self.assertRaises(SayWhenError, it.__next__)
2489
2490        # SayWhenError seen at start of problematic chunk's results
2491        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2492        for i in range(6):
2493            self.assertEqual(next(it), i*i)
2494        self.assertRaises(SayWhenError, it.__next__)
2495        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2496        for i in range(4):
2497            self.assertEqual(next(it), i*i)
2498        self.assertRaises(SayWhenError, it.__next__)
2499
2500    def test_imap_unordered(self):
2501        it = self.pool.imap_unordered(sqr, list(range(10)))
2502        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2503
2504        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2505        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2506
2507    def test_imap_unordered_handle_iterable_exception(self):
2508        if self.TYPE == 'manager':
2509            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2510
2511        # SayWhenError seen at the very first of the iterable
2512        it = self.pool.imap_unordered(sqr,
2513                                      exception_throwing_generator(1, -1),
2514                                      1)
2515        self.assertRaises(SayWhenError, it.__next__)
2516        # again, make sure it's reentrant
2517        it = self.pool.imap_unordered(sqr,
2518                                      exception_throwing_generator(1, -1),
2519                                      1)
2520        self.assertRaises(SayWhenError, it.__next__)
2521
2522        it = self.pool.imap_unordered(sqr,
2523                                      exception_throwing_generator(10, 3),
2524                                      1)
2525        expected_values = list(map(sqr, list(range(10))))
2526        with self.assertRaises(SayWhenError):
2527            # imap_unordered makes it difficult to anticipate the SayWhenError
2528            for i in range(10):
2529                value = next(it)
2530                self.assertIn(value, expected_values)
2531                expected_values.remove(value)
2532
2533        it = self.pool.imap_unordered(sqr,
2534                                      exception_throwing_generator(20, 7),
2535                                      2)
2536        expected_values = list(map(sqr, list(range(20))))
2537        with self.assertRaises(SayWhenError):
2538            for i in range(20):
2539                value = next(it)
2540                self.assertIn(value, expected_values)
2541                expected_values.remove(value)
2542
2543    def test_make_pool(self):
2544        expected_error = (RemoteError if self.TYPE == 'manager'
2545                          else ValueError)
2546
2547        self.assertRaises(expected_error, self.Pool, -1)
2548        self.assertRaises(expected_error, self.Pool, 0)
2549
2550        if self.TYPE != 'manager':
2551            p = self.Pool(3)
2552            try:
2553                self.assertEqual(3, len(p._pool))
2554            finally:
2555                p.close()
2556                p.join()
2557
2558    def test_terminate(self):
2559        result = self.pool.map_async(
2560            time.sleep, [0.1 for i in range(10000)], chunksize=1
2561            )
2562        self.pool.terminate()
2563        join = TimingWrapper(self.pool.join)
2564        join()
2565        # Sanity check the pool didn't wait for all tasks to finish
2566        self.assertLess(join.elapsed, 2.0)
2567
2568    def test_empty_iterable(self):
2569        # See Issue 12157
2570        p = self.Pool(1)
2571
2572        self.assertEqual(p.map(sqr, []), [])
2573        self.assertEqual(list(p.imap(sqr, [])), [])
2574        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2575        self.assertEqual(p.map_async(sqr, []).get(), [])
2576
2577        p.close()
2578        p.join()
2579
2580    def test_context(self):
2581        if self.TYPE == 'processes':
2582            L = list(range(10))
2583            expected = [sqr(i) for i in L]
2584            with self.Pool(2) as p:
2585                r = p.map_async(sqr, L)
2586                self.assertEqual(r.get(), expected)
2587            p.join()
2588            self.assertRaises(ValueError, p.map_async, sqr, L)
2589
2590    @classmethod
2591    def _test_traceback(cls):
2592        raise RuntimeError(123) # some comment
2593
2594    def test_traceback(self):
2595        # We want ensure that the traceback from the child process is
2596        # contained in the traceback raised in the main process.
2597        if self.TYPE == 'processes':
2598            with self.Pool(1) as p:
2599                try:
2600                    p.apply(self._test_traceback)
2601                except Exception as e:
2602                    exc = e
2603                else:
2604                    self.fail('expected RuntimeError')
2605            p.join()
2606            self.assertIs(type(exc), RuntimeError)
2607            self.assertEqual(exc.args, (123,))
2608            cause = exc.__cause__
2609            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2610            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2611
2612            with test.support.captured_stderr() as f1:
2613                try:
2614                    raise exc
2615                except RuntimeError:
2616                    sys.excepthook(*sys.exc_info())
2617            self.assertIn('raise RuntimeError(123) # some comment',
2618                          f1.getvalue())
2619            # _helper_reraises_exception should not make the error
2620            # a remote exception
2621            with self.Pool(1) as p:
2622                try:
2623                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2624                except Exception as e:
2625                    exc = e
2626                else:
2627                    self.fail('expected SayWhenError')
2628                self.assertIs(type(exc), SayWhenError)
2629                self.assertIs(exc.__cause__, None)
2630            p.join()
2631
2632    @classmethod
2633    def _test_wrapped_exception(cls):
2634        raise RuntimeError('foo')
2635
2636    def test_wrapped_exception(self):
2637        # Issue #20980: Should not wrap exception when using thread pool
2638        with self.Pool(1) as p:
2639            with self.assertRaises(RuntimeError):
2640                p.apply(self._test_wrapped_exception)
2641        p.join()
2642
2643    def test_map_no_failfast(self):
2644        # Issue #23992: the fail-fast behaviour when an exception is raised
2645        # during map() would make Pool.join() deadlock, because a worker
2646        # process would fill the result queue (after the result handler thread
2647        # terminated, hence not draining it anymore).
2648
2649        t_start = time.monotonic()
2650
2651        with self.assertRaises(ValueError):
2652            with self.Pool(2) as p:
2653                try:
2654                    p.map(raise_large_valuerror, [0, 1])
2655                finally:
2656                    time.sleep(0.5)
2657                    p.close()
2658                    p.join()
2659
2660        # check that we indeed waited for all jobs
2661        self.assertGreater(time.monotonic() - t_start, 0.9)
2662
2663    def test_release_task_refs(self):
2664        # Issue #29861: task arguments and results should not be kept
2665        # alive after we are done with them.
2666        objs = [CountedObject() for i in range(10)]
2667        refs = [weakref.ref(o) for o in objs]
2668        self.pool.map(identity, objs)
2669
2670        del objs
2671        gc.collect()  # For PyPy or other GCs.
2672        time.sleep(DELTA)  # let threaded cleanup code run
2673        self.assertEqual(set(wr() for wr in refs), {None})
2674        # With a process pool, copies of the objects are returned, check
2675        # they were released too.
2676        self.assertEqual(CountedObject.n_instances, 0)
2677
2678    def test_enter(self):
2679        if self.TYPE == 'manager':
2680            self.skipTest("test not applicable to manager")
2681
2682        pool = self.Pool(1)
2683        with pool:
2684            pass
2685            # call pool.terminate()
2686        # pool is no longer running
2687
2688        with self.assertRaises(ValueError):
2689            # bpo-35477: pool.__enter__() fails if the pool is not running
2690            with pool:
2691                pass
2692        pool.join()
2693
2694    def test_resource_warning(self):
2695        if self.TYPE == 'manager':
2696            self.skipTest("test not applicable to manager")
2697
2698        pool = self.Pool(1)
2699        pool.terminate()
2700        pool.join()
2701
2702        # force state to RUN to emit ResourceWarning in __del__()
2703        pool._state = multiprocessing.pool.RUN
2704
2705        with warnings_helper.check_warnings(
2706                ('unclosed running multiprocessing pool', ResourceWarning)):
2707            pool = None
2708            support.gc_collect()
2709
2710def raising():
2711    raise KeyError("key")
2712
2713def unpickleable_result():
2714    return lambda: 42
2715
2716class _TestPoolWorkerErrors(BaseTestCase):
2717    ALLOWED_TYPES = ('processes', )
2718
2719    def test_async_error_callback(self):
2720        p = multiprocessing.Pool(2)
2721
2722        scratchpad = [None]
2723        def errback(exc):
2724            scratchpad[0] = exc
2725
2726        res = p.apply_async(raising, error_callback=errback)
2727        self.assertRaises(KeyError, res.get)
2728        self.assertTrue(scratchpad[0])
2729        self.assertIsInstance(scratchpad[0], KeyError)
2730
2731        p.close()
2732        p.join()
2733
2734    def test_unpickleable_result(self):
2735        from multiprocessing.pool import MaybeEncodingError
2736        p = multiprocessing.Pool(2)
2737
2738        # Make sure we don't lose pool processes because of encoding errors.
2739        for iteration in range(20):
2740
2741            scratchpad = [None]
2742            def errback(exc):
2743                scratchpad[0] = exc
2744
2745            res = p.apply_async(unpickleable_result, error_callback=errback)
2746            self.assertRaises(MaybeEncodingError, res.get)
2747            wrapped = scratchpad[0]
2748            self.assertTrue(wrapped)
2749            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2750            self.assertIsNotNone(wrapped.exc)
2751            self.assertIsNotNone(wrapped.value)
2752
2753        p.close()
2754        p.join()
2755
2756class _TestPoolWorkerLifetime(BaseTestCase):
2757    ALLOWED_TYPES = ('processes', )
2758
2759    def test_pool_worker_lifetime(self):
2760        p = multiprocessing.Pool(3, maxtasksperchild=10)
2761        self.assertEqual(3, len(p._pool))
2762        origworkerpids = [w.pid for w in p._pool]
2763        # Run many tasks so each worker gets replaced (hopefully)
2764        results = []
2765        for i in range(100):
2766            results.append(p.apply_async(sqr, (i, )))
2767        # Fetch the results and verify we got the right answers,
2768        # also ensuring all the tasks have completed.
2769        for (j, res) in enumerate(results):
2770            self.assertEqual(res.get(), sqr(j))
2771        # Refill the pool
2772        p._repopulate_pool()
2773        # Wait until all workers are alive
2774        # (countdown * DELTA = 5 seconds max startup process time)
2775        countdown = 50
2776        while countdown and not all(w.is_alive() for w in p._pool):
2777            countdown -= 1
2778            time.sleep(DELTA)
2779        finalworkerpids = [w.pid for w in p._pool]
2780        # All pids should be assigned.  See issue #7805.
2781        self.assertNotIn(None, origworkerpids)
2782        self.assertNotIn(None, finalworkerpids)
2783        # Finally, check that the worker pids have changed
2784        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2785        p.close()
2786        p.join()
2787
2788    def test_pool_worker_lifetime_early_close(self):
2789        # Issue #10332: closing a pool whose workers have limited lifetimes
2790        # before all the tasks completed would make join() hang.
2791        p = multiprocessing.Pool(3, maxtasksperchild=1)
2792        results = []
2793        for i in range(6):
2794            results.append(p.apply_async(sqr, (i, 0.3)))
2795        p.close()
2796        p.join()
2797        # check the results
2798        for (j, res) in enumerate(results):
2799            self.assertEqual(res.get(), sqr(j))
2800
2801    def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
2802        # tests cases against bpo-38744 and bpo-39360
2803        cmd = '''if 1:
2804            from multiprocessing import Pool
2805            problem = None
2806            class A:
2807                def __init__(self):
2808                    self.pool = Pool(processes=1)
2809            def test():
2810                global problem
2811                problem = A()
2812                problem.pool.map(float, tuple(range(10)))
2813            if __name__ == "__main__":
2814                test()
2815        '''
2816        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
2817        self.assertEqual(rc, 0)
2818
2819#
2820# Test of creating a customized manager class
2821#
2822
2823from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2824
2825class FooBar(object):
2826    def f(self):
2827        return 'f()'
2828    def g(self):
2829        raise ValueError
2830    def _h(self):
2831        return '_h()'
2832
2833def baz():
2834    for i in range(10):
2835        yield i*i
2836
2837class IteratorProxy(BaseProxy):
2838    _exposed_ = ('__next__',)
2839    def __iter__(self):
2840        return self
2841    def __next__(self):
2842        return self._callmethod('__next__')
2843
2844class MyManager(BaseManager):
2845    pass
2846
2847MyManager.register('Foo', callable=FooBar)
2848MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2849MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2850
2851
2852class _TestMyManager(BaseTestCase):
2853
2854    ALLOWED_TYPES = ('manager',)
2855
2856    def test_mymanager(self):
2857        manager = MyManager()
2858        manager.start()
2859        self.common(manager)
2860        manager.shutdown()
2861
2862        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2863        # to the manager process if it takes longer than 1 second to stop,
2864        # which happens on slow buildbots.
2865        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2866
2867    def test_mymanager_context(self):
2868        with MyManager() as manager:
2869            self.common(manager)
2870        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2871        # to the manager process if it takes longer than 1 second to stop,
2872        # which happens on slow buildbots.
2873        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2874
2875    def test_mymanager_context_prestarted(self):
2876        manager = MyManager()
2877        manager.start()
2878        with manager:
2879            self.common(manager)
2880        self.assertEqual(manager._process.exitcode, 0)
2881
2882    def common(self, manager):
2883        foo = manager.Foo()
2884        bar = manager.Bar()
2885        baz = manager.baz()
2886
2887        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2888        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2889
2890        self.assertEqual(foo_methods, ['f', 'g'])
2891        self.assertEqual(bar_methods, ['f', '_h'])
2892
2893        self.assertEqual(foo.f(), 'f()')
2894        self.assertRaises(ValueError, foo.g)
2895        self.assertEqual(foo._callmethod('f'), 'f()')
2896        self.assertRaises(RemoteError, foo._callmethod, '_h')
2897
2898        self.assertEqual(bar.f(), 'f()')
2899        self.assertEqual(bar._h(), '_h()')
2900        self.assertEqual(bar._callmethod('f'), 'f()')
2901        self.assertEqual(bar._callmethod('_h'), '_h()')
2902
2903        self.assertEqual(list(baz), [i*i for i in range(10)])
2904
2905
2906#
2907# Test of connecting to a remote server and using xmlrpclib for serialization
2908#
2909
2910_queue = pyqueue.Queue()
2911def get_queue():
2912    return _queue
2913
2914class QueueManager(BaseManager):
2915    '''manager class used by server process'''
2916QueueManager.register('get_queue', callable=get_queue)
2917
2918class QueueManager2(BaseManager):
2919    '''manager class which specifies the same interface as QueueManager'''
2920QueueManager2.register('get_queue')
2921
2922
2923SERIALIZER = 'xmlrpclib'
2924
2925class _TestRemoteManager(BaseTestCase):
2926
2927    ALLOWED_TYPES = ('manager',)
2928    values = ['hello world', None, True, 2.25,
2929              'hall\xe5 v\xe4rlden',
2930              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2931              b'hall\xe5 v\xe4rlden',
2932             ]
2933    result = values[:]
2934
2935    @classmethod
2936    def _putter(cls, address, authkey):
2937        manager = QueueManager2(
2938            address=address, authkey=authkey, serializer=SERIALIZER
2939            )
2940        manager.connect()
2941        queue = manager.get_queue()
2942        # Note that xmlrpclib will deserialize object as a list not a tuple
2943        queue.put(tuple(cls.values))
2944
2945    def test_remote(self):
2946        authkey = os.urandom(32)
2947
2948        manager = QueueManager(
2949            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
2950            )
2951        manager.start()
2952        self.addCleanup(manager.shutdown)
2953
2954        p = self.Process(target=self._putter, args=(manager.address, authkey))
2955        p.daemon = True
2956        p.start()
2957
2958        manager2 = QueueManager2(
2959            address=manager.address, authkey=authkey, serializer=SERIALIZER
2960            )
2961        manager2.connect()
2962        queue = manager2.get_queue()
2963
2964        self.assertEqual(queue.get(), self.result)
2965
2966        # Because we are using xmlrpclib for serialization instead of
2967        # pickle this will cause a serialization error.
2968        self.assertRaises(Exception, queue.put, time.sleep)
2969
2970        # Make queue finalizer run before the server is stopped
2971        del queue
2972
2973
2974@hashlib_helper.requires_hashdigest('md5')
2975class _TestManagerRestart(BaseTestCase):
2976
2977    @classmethod
2978    def _putter(cls, address, authkey):
2979        manager = QueueManager(
2980            address=address, authkey=authkey, serializer=SERIALIZER)
2981        manager.connect()
2982        queue = manager.get_queue()
2983        queue.put('hello world')
2984
2985    def test_rapid_restart(self):
2986        authkey = os.urandom(32)
2987        manager = QueueManager(
2988            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2989        try:
2990            srvr = manager.get_server()
2991            addr = srvr.address
2992            # Close the connection.Listener socket which gets opened as a part
2993            # of manager.get_server(). It's not needed for the test.
2994            srvr.listener.close()
2995            manager.start()
2996
2997            p = self.Process(target=self._putter, args=(manager.address, authkey))
2998            p.start()
2999            p.join()
3000            queue = manager.get_queue()
3001            self.assertEqual(queue.get(), 'hello world')
3002            del queue
3003        finally:
3004            if hasattr(manager, "shutdown"):
3005                manager.shutdown()
3006
3007        manager = QueueManager(
3008            address=addr, authkey=authkey, serializer=SERIALIZER)
3009        try:
3010            manager.start()
3011            self.addCleanup(manager.shutdown)
3012        except OSError as e:
3013            if e.errno != errno.EADDRINUSE:
3014                raise
3015            # Retry after some time, in case the old socket was lingering
3016            # (sporadic failure on buildbots)
3017            time.sleep(1.0)
3018            manager = QueueManager(
3019                address=addr, authkey=authkey, serializer=SERIALIZER)
3020            if hasattr(manager, "shutdown"):
3021                self.addCleanup(manager.shutdown)
3022
3023#
3024#
3025#
3026
3027SENTINEL = latin('')
3028
3029class _TestConnection(BaseTestCase):
3030
3031    ALLOWED_TYPES = ('processes', 'threads')
3032
3033    @classmethod
3034    def _echo(cls, conn):
3035        for msg in iter(conn.recv_bytes, SENTINEL):
3036            conn.send_bytes(msg)
3037        conn.close()
3038
3039    def test_connection(self):
3040        conn, child_conn = self.Pipe()
3041
3042        p = self.Process(target=self._echo, args=(child_conn,))
3043        p.daemon = True
3044        p.start()
3045
3046        seq = [1, 2.25, None]
3047        msg = latin('hello world')
3048        longmsg = msg * 10
3049        arr = array.array('i', list(range(4)))
3050
3051        if self.TYPE == 'processes':
3052            self.assertEqual(type(conn.fileno()), int)
3053
3054        self.assertEqual(conn.send(seq), None)
3055        self.assertEqual(conn.recv(), seq)
3056
3057        self.assertEqual(conn.send_bytes(msg), None)
3058        self.assertEqual(conn.recv_bytes(), msg)
3059
3060        if self.TYPE == 'processes':
3061            buffer = array.array('i', [0]*10)
3062            expected = list(arr) + [0] * (10 - len(arr))
3063            self.assertEqual(conn.send_bytes(arr), None)
3064            self.assertEqual(conn.recv_bytes_into(buffer),
3065                             len(arr) * buffer.itemsize)
3066            self.assertEqual(list(buffer), expected)
3067
3068            buffer = array.array('i', [0]*10)
3069            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3070            self.assertEqual(conn.send_bytes(arr), None)
3071            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3072                             len(arr) * buffer.itemsize)
3073            self.assertEqual(list(buffer), expected)
3074
3075            buffer = bytearray(latin(' ' * 40))
3076            self.assertEqual(conn.send_bytes(longmsg), None)
3077            try:
3078                res = conn.recv_bytes_into(buffer)
3079            except multiprocessing.BufferTooShort as e:
3080                self.assertEqual(e.args, (longmsg,))
3081            else:
3082                self.fail('expected BufferTooShort, got %s' % res)
3083
3084        poll = TimingWrapper(conn.poll)
3085
3086        self.assertEqual(poll(), False)
3087        self.assertTimingAlmostEqual(poll.elapsed, 0)
3088
3089        self.assertEqual(poll(-1), False)
3090        self.assertTimingAlmostEqual(poll.elapsed, 0)
3091
3092        self.assertEqual(poll(TIMEOUT1), False)
3093        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3094
3095        conn.send(None)
3096        time.sleep(.1)
3097
3098        self.assertEqual(poll(TIMEOUT1), True)
3099        self.assertTimingAlmostEqual(poll.elapsed, 0)
3100
3101        self.assertEqual(conn.recv(), None)
3102
3103        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
3104        conn.send_bytes(really_big_msg)
3105        self.assertEqual(conn.recv_bytes(), really_big_msg)
3106
3107        conn.send_bytes(SENTINEL)                          # tell child to quit
3108        child_conn.close()
3109
3110        if self.TYPE == 'processes':
3111            self.assertEqual(conn.readable, True)
3112            self.assertEqual(conn.writable, True)
3113            self.assertRaises(EOFError, conn.recv)
3114            self.assertRaises(EOFError, conn.recv_bytes)
3115
3116        p.join()
3117
3118    def test_duplex_false(self):
3119        reader, writer = self.Pipe(duplex=False)
3120        self.assertEqual(writer.send(1), None)
3121        self.assertEqual(reader.recv(), 1)
3122        if self.TYPE == 'processes':
3123            self.assertEqual(reader.readable, True)
3124            self.assertEqual(reader.writable, False)
3125            self.assertEqual(writer.readable, False)
3126            self.assertEqual(writer.writable, True)
3127            self.assertRaises(OSError, reader.send, 2)
3128            self.assertRaises(OSError, writer.recv)
3129            self.assertRaises(OSError, writer.poll)
3130
3131    def test_spawn_close(self):
3132        # We test that a pipe connection can be closed by parent
3133        # process immediately after child is spawned.  On Windows this
3134        # would have sometimes failed on old versions because
3135        # child_conn would be closed before the child got a chance to
3136        # duplicate it.
3137        conn, child_conn = self.Pipe()
3138
3139        p = self.Process(target=self._echo, args=(child_conn,))
3140        p.daemon = True
3141        p.start()
3142        child_conn.close()    # this might complete before child initializes
3143
3144        msg = latin('hello')
3145        conn.send_bytes(msg)
3146        self.assertEqual(conn.recv_bytes(), msg)
3147
3148        conn.send_bytes(SENTINEL)
3149        conn.close()
3150        p.join()
3151
3152    def test_sendbytes(self):
3153        if self.TYPE != 'processes':
3154            self.skipTest('test not appropriate for {}'.format(self.TYPE))
3155
3156        msg = latin('abcdefghijklmnopqrstuvwxyz')
3157        a, b = self.Pipe()
3158
3159        a.send_bytes(msg)
3160        self.assertEqual(b.recv_bytes(), msg)
3161
3162        a.send_bytes(msg, 5)
3163        self.assertEqual(b.recv_bytes(), msg[5:])
3164
3165        a.send_bytes(msg, 7, 8)
3166        self.assertEqual(b.recv_bytes(), msg[7:7+8])
3167
3168        a.send_bytes(msg, 26)
3169        self.assertEqual(b.recv_bytes(), latin(''))
3170
3171        a.send_bytes(msg, 26, 0)
3172        self.assertEqual(b.recv_bytes(), latin(''))
3173
3174        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3175
3176        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3177
3178        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3179
3180        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3181
3182        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3183
3184    @classmethod
3185    def _is_fd_assigned(cls, fd):
3186        try:
3187            os.fstat(fd)
3188        except OSError as e:
3189            if e.errno == errno.EBADF:
3190                return False
3191            raise
3192        else:
3193            return True
3194
3195    @classmethod
3196    def _writefd(cls, conn, data, create_dummy_fds=False):
3197        if create_dummy_fds:
3198            for i in range(0, 256):
3199                if not cls._is_fd_assigned(i):
3200                    os.dup2(conn.fileno(), i)
3201        fd = reduction.recv_handle(conn)
3202        if msvcrt:
3203            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3204        os.write(fd, data)
3205        os.close(fd)
3206
3207    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3208    def test_fd_transfer(self):
3209        if self.TYPE != 'processes':
3210            self.skipTest("only makes sense with processes")
3211        conn, child_conn = self.Pipe(duplex=True)
3212
3213        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3214        p.daemon = True
3215        p.start()
3216        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3217        with open(os_helper.TESTFN, "wb") as f:
3218            fd = f.fileno()
3219            if msvcrt:
3220                fd = msvcrt.get_osfhandle(fd)
3221            reduction.send_handle(conn, fd, p.pid)
3222        p.join()
3223        with open(os_helper.TESTFN, "rb") as f:
3224            self.assertEqual(f.read(), b"foo")
3225
3226    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3227    @unittest.skipIf(sys.platform == "win32",
3228                     "test semantics don't make sense on Windows")
3229    @unittest.skipIf(MAXFD <= 256,
3230                     "largest assignable fd number is too small")
3231    @unittest.skipUnless(hasattr(os, "dup2"),
3232                         "test needs os.dup2()")
3233    def test_large_fd_transfer(self):
3234        # With fd > 256 (issue #11657)
3235        if self.TYPE != 'processes':
3236            self.skipTest("only makes sense with processes")
3237        conn, child_conn = self.Pipe(duplex=True)
3238
3239        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3240        p.daemon = True
3241        p.start()
3242        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3243        with open(os_helper.TESTFN, "wb") as f:
3244            fd = f.fileno()
3245            for newfd in range(256, MAXFD):
3246                if not self._is_fd_assigned(newfd):
3247                    break
3248            else:
3249                self.fail("could not find an unassigned large file descriptor")
3250            os.dup2(fd, newfd)
3251            try:
3252                reduction.send_handle(conn, newfd, p.pid)
3253            finally:
3254                os.close(newfd)
3255        p.join()
3256        with open(os_helper.TESTFN, "rb") as f:
3257            self.assertEqual(f.read(), b"bar")
3258
3259    @classmethod
3260    def _send_data_without_fd(self, conn):
3261        os.write(conn.fileno(), b"\0")
3262
3263    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3264    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3265    def test_missing_fd_transfer(self):
3266        # Check that exception is raised when received data is not
3267        # accompanied by a file descriptor in ancillary data.
3268        if self.TYPE != 'processes':
3269            self.skipTest("only makes sense with processes")
3270        conn, child_conn = self.Pipe(duplex=True)
3271
3272        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3273        p.daemon = True
3274        p.start()
3275        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3276        p.join()
3277
3278    def test_context(self):
3279        a, b = self.Pipe()
3280
3281        with a, b:
3282            a.send(1729)
3283            self.assertEqual(b.recv(), 1729)
3284            if self.TYPE == 'processes':
3285                self.assertFalse(a.closed)
3286                self.assertFalse(b.closed)
3287
3288        if self.TYPE == 'processes':
3289            self.assertTrue(a.closed)
3290            self.assertTrue(b.closed)
3291            self.assertRaises(OSError, a.recv)
3292            self.assertRaises(OSError, b.recv)
3293
3294class _TestListener(BaseTestCase):
3295
3296    ALLOWED_TYPES = ('processes',)
3297
3298    def test_multiple_bind(self):
3299        for family in self.connection.families:
3300            l = self.connection.Listener(family=family)
3301            self.addCleanup(l.close)
3302            self.assertRaises(OSError, self.connection.Listener,
3303                              l.address, family)
3304
3305    def test_context(self):
3306        with self.connection.Listener() as l:
3307            with self.connection.Client(l.address) as c:
3308                with l.accept() as d:
3309                    c.send(1729)
3310                    self.assertEqual(d.recv(), 1729)
3311
3312        if self.TYPE == 'processes':
3313            self.assertRaises(OSError, l.accept)
3314
3315    @unittest.skipUnless(util.abstract_sockets_supported,
3316                         "test needs abstract socket support")
3317    def test_abstract_socket(self):
3318        with self.connection.Listener("\0something") as listener:
3319            with self.connection.Client(listener.address) as client:
3320                with listener.accept() as d:
3321                    client.send(1729)
3322                    self.assertEqual(d.recv(), 1729)
3323
3324        if self.TYPE == 'processes':
3325            self.assertRaises(OSError, listener.accept)
3326
3327
3328class _TestListenerClient(BaseTestCase):
3329
3330    ALLOWED_TYPES = ('processes', 'threads')
3331
3332    @classmethod
3333    def _test(cls, address):
3334        conn = cls.connection.Client(address)
3335        conn.send('hello')
3336        conn.close()
3337
3338    def test_listener_client(self):
3339        for family in self.connection.families:
3340            l = self.connection.Listener(family=family)
3341            p = self.Process(target=self._test, args=(l.address,))
3342            p.daemon = True
3343            p.start()
3344            conn = l.accept()
3345            self.assertEqual(conn.recv(), 'hello')
3346            p.join()
3347            l.close()
3348
3349    def test_issue14725(self):
3350        l = self.connection.Listener()
3351        p = self.Process(target=self._test, args=(l.address,))
3352        p.daemon = True
3353        p.start()
3354        time.sleep(1)
3355        # On Windows the client process should by now have connected,
3356        # written data and closed the pipe handle by now.  This causes
3357        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3358        # 14725.
3359        conn = l.accept()
3360        self.assertEqual(conn.recv(), 'hello')
3361        conn.close()
3362        p.join()
3363        l.close()
3364
3365    def test_issue16955(self):
3366        for fam in self.connection.families:
3367            l = self.connection.Listener(family=fam)
3368            c = self.connection.Client(l.address)
3369            a = l.accept()
3370            a.send_bytes(b"hello")
3371            self.assertTrue(c.poll(1))
3372            a.close()
3373            c.close()
3374            l.close()
3375
3376class _TestPoll(BaseTestCase):
3377
3378    ALLOWED_TYPES = ('processes', 'threads')
3379
3380    def test_empty_string(self):
3381        a, b = self.Pipe()
3382        self.assertEqual(a.poll(), False)
3383        b.send_bytes(b'')
3384        self.assertEqual(a.poll(), True)
3385        self.assertEqual(a.poll(), True)
3386
3387    @classmethod
3388    def _child_strings(cls, conn, strings):
3389        for s in strings:
3390            time.sleep(0.1)
3391            conn.send_bytes(s)
3392        conn.close()
3393
3394    def test_strings(self):
3395        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3396        a, b = self.Pipe()
3397        p = self.Process(target=self._child_strings, args=(b, strings))
3398        p.start()
3399
3400        for s in strings:
3401            for i in range(200):
3402                if a.poll(0.01):
3403                    break
3404            x = a.recv_bytes()
3405            self.assertEqual(s, x)
3406
3407        p.join()
3408
3409    @classmethod
3410    def _child_boundaries(cls, r):
3411        # Polling may "pull" a message in to the child process, but we
3412        # don't want it to pull only part of a message, as that would
3413        # corrupt the pipe for any other processes which might later
3414        # read from it.
3415        r.poll(5)
3416
3417    def test_boundaries(self):
3418        r, w = self.Pipe(False)
3419        p = self.Process(target=self._child_boundaries, args=(r,))
3420        p.start()
3421        time.sleep(2)
3422        L = [b"first", b"second"]
3423        for obj in L:
3424            w.send_bytes(obj)
3425        w.close()
3426        p.join()
3427        self.assertIn(r.recv_bytes(), L)
3428
3429    @classmethod
3430    def _child_dont_merge(cls, b):
3431        b.send_bytes(b'a')
3432        b.send_bytes(b'b')
3433        b.send_bytes(b'cd')
3434
3435    def test_dont_merge(self):
3436        a, b = self.Pipe()
3437        self.assertEqual(a.poll(0.0), False)
3438        self.assertEqual(a.poll(0.1), False)
3439
3440        p = self.Process(target=self._child_dont_merge, args=(b,))
3441        p.start()
3442
3443        self.assertEqual(a.recv_bytes(), b'a')
3444        self.assertEqual(a.poll(1.0), True)
3445        self.assertEqual(a.poll(1.0), True)
3446        self.assertEqual(a.recv_bytes(), b'b')
3447        self.assertEqual(a.poll(1.0), True)
3448        self.assertEqual(a.poll(1.0), True)
3449        self.assertEqual(a.poll(0.0), True)
3450        self.assertEqual(a.recv_bytes(), b'cd')
3451
3452        p.join()
3453
3454#
3455# Test of sending connection and socket objects between processes
3456#
3457
3458@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3459@hashlib_helper.requires_hashdigest('md5')
3460class _TestPicklingConnections(BaseTestCase):
3461
3462    ALLOWED_TYPES = ('processes',)
3463
3464    @classmethod
3465    def tearDownClass(cls):
3466        from multiprocessing import resource_sharer
3467        resource_sharer.stop(timeout=support.LONG_TIMEOUT)
3468
3469    @classmethod
3470    def _listener(cls, conn, families):
3471        for fam in families:
3472            l = cls.connection.Listener(family=fam)
3473            conn.send(l.address)
3474            new_conn = l.accept()
3475            conn.send(new_conn)
3476            new_conn.close()
3477            l.close()
3478
3479        l = socket.create_server((socket_helper.HOST, 0))
3480        conn.send(l.getsockname())
3481        new_conn, addr = l.accept()
3482        conn.send(new_conn)
3483        new_conn.close()
3484        l.close()
3485
3486        conn.recv()
3487
3488    @classmethod
3489    def _remote(cls, conn):
3490        for (address, msg) in iter(conn.recv, None):
3491            client = cls.connection.Client(address)
3492            client.send(msg.upper())
3493            client.close()
3494
3495        address, msg = conn.recv()
3496        client = socket.socket()
3497        client.connect(address)
3498        client.sendall(msg.upper())
3499        client.close()
3500
3501        conn.close()
3502
3503    def test_pickling(self):
3504        families = self.connection.families
3505
3506        lconn, lconn0 = self.Pipe()
3507        lp = self.Process(target=self._listener, args=(lconn0, families))
3508        lp.daemon = True
3509        lp.start()
3510        lconn0.close()
3511
3512        rconn, rconn0 = self.Pipe()
3513        rp = self.Process(target=self._remote, args=(rconn0,))
3514        rp.daemon = True
3515        rp.start()
3516        rconn0.close()
3517
3518        for fam in families:
3519            msg = ('This connection uses family %s' % fam).encode('ascii')
3520            address = lconn.recv()
3521            rconn.send((address, msg))
3522            new_conn = lconn.recv()
3523            self.assertEqual(new_conn.recv(), msg.upper())
3524
3525        rconn.send(None)
3526
3527        msg = latin('This connection uses a normal socket')
3528        address = lconn.recv()
3529        rconn.send((address, msg))
3530        new_conn = lconn.recv()
3531        buf = []
3532        while True:
3533            s = new_conn.recv(100)
3534            if not s:
3535                break
3536            buf.append(s)
3537        buf = b''.join(buf)
3538        self.assertEqual(buf, msg.upper())
3539        new_conn.close()
3540
3541        lconn.send(None)
3542
3543        rconn.close()
3544        lconn.close()
3545
3546        lp.join()
3547        rp.join()
3548
3549    @classmethod
3550    def child_access(cls, conn):
3551        w = conn.recv()
3552        w.send('all is well')
3553        w.close()
3554
3555        r = conn.recv()
3556        msg = r.recv()
3557        conn.send(msg*2)
3558
3559        conn.close()
3560
3561    def test_access(self):
3562        # On Windows, if we do not specify a destination pid when
3563        # using DupHandle then we need to be careful to use the
3564        # correct access flags for DuplicateHandle(), or else
3565        # DupHandle.detach() will raise PermissionError.  For example,
3566        # for a read only pipe handle we should use
3567        # access=FILE_GENERIC_READ.  (Unfortunately
3568        # DUPLICATE_SAME_ACCESS does not work.)
3569        conn, child_conn = self.Pipe()
3570        p = self.Process(target=self.child_access, args=(child_conn,))
3571        p.daemon = True
3572        p.start()
3573        child_conn.close()
3574
3575        r, w = self.Pipe(duplex=False)
3576        conn.send(w)
3577        w.close()
3578        self.assertEqual(r.recv(), 'all is well')
3579        r.close()
3580
3581        r, w = self.Pipe(duplex=False)
3582        conn.send(r)
3583        r.close()
3584        w.send('foobar')
3585        w.close()
3586        self.assertEqual(conn.recv(), 'foobar'*2)
3587
3588        p.join()
3589
3590#
3591#
3592#
3593
3594class _TestHeap(BaseTestCase):
3595
3596    ALLOWED_TYPES = ('processes',)
3597
3598    def setUp(self):
3599        super().setUp()
3600        # Make pristine heap for these tests
3601        self.old_heap = multiprocessing.heap.BufferWrapper._heap
3602        multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3603
3604    def tearDown(self):
3605        multiprocessing.heap.BufferWrapper._heap = self.old_heap
3606        super().tearDown()
3607
3608    def test_heap(self):
3609        iterations = 5000
3610        maxblocks = 50
3611        blocks = []
3612
3613        # get the heap object
3614        heap = multiprocessing.heap.BufferWrapper._heap
3615        heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3616
3617        # create and destroy lots of blocks of different sizes
3618        for i in range(iterations):
3619            size = int(random.lognormvariate(0, 1) * 1000)
3620            b = multiprocessing.heap.BufferWrapper(size)
3621            blocks.append(b)
3622            if len(blocks) > maxblocks:
3623                i = random.randrange(maxblocks)
3624                del blocks[i]
3625            del b
3626
3627        # verify the state of the heap
3628        with heap._lock:
3629            all = []
3630            free = 0
3631            occupied = 0
3632            for L in list(heap._len_to_seq.values()):
3633                # count all free blocks in arenas
3634                for arena, start, stop in L:
3635                    all.append((heap._arenas.index(arena), start, stop,
3636                                stop-start, 'free'))
3637                    free += (stop-start)
3638            for arena, arena_blocks in heap._allocated_blocks.items():
3639                # count all allocated blocks in arenas
3640                for start, stop in arena_blocks:
3641                    all.append((heap._arenas.index(arena), start, stop,
3642                                stop-start, 'occupied'))
3643                    occupied += (stop-start)
3644
3645            self.assertEqual(free + occupied,
3646                             sum(arena.size for arena in heap._arenas))
3647
3648            all.sort()
3649
3650            for i in range(len(all)-1):
3651                (arena, start, stop) = all[i][:3]
3652                (narena, nstart, nstop) = all[i+1][:3]
3653                if arena != narena:
3654                    # Two different arenas
3655                    self.assertEqual(stop, heap._arenas[arena].size)  # last block
3656                    self.assertEqual(nstart, 0)         # first block
3657                else:
3658                    # Same arena: two adjacent blocks
3659                    self.assertEqual(stop, nstart)
3660
3661        # test free'ing all blocks
3662        random.shuffle(blocks)
3663        while blocks:
3664            blocks.pop()
3665
3666        self.assertEqual(heap._n_frees, heap._n_mallocs)
3667        self.assertEqual(len(heap._pending_free_blocks), 0)
3668        self.assertEqual(len(heap._arenas), 0)
3669        self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3670        self.assertEqual(len(heap._len_to_seq), 0)
3671
3672    def test_free_from_gc(self):
3673        # Check that freeing of blocks by the garbage collector doesn't deadlock
3674        # (issue #12352).
3675        # Make sure the GC is enabled, and set lower collection thresholds to
3676        # make collections more frequent (and increase the probability of
3677        # deadlock).
3678        if not gc.isenabled():
3679            gc.enable()
3680            self.addCleanup(gc.disable)
3681        thresholds = gc.get_threshold()
3682        self.addCleanup(gc.set_threshold, *thresholds)
3683        gc.set_threshold(10)
3684
3685        # perform numerous block allocations, with cyclic references to make
3686        # sure objects are collected asynchronously by the gc
3687        for i in range(5000):
3688            a = multiprocessing.heap.BufferWrapper(1)
3689            b = multiprocessing.heap.BufferWrapper(1)
3690            # circular references
3691            a.buddy = b
3692            b.buddy = a
3693
3694#
3695#
3696#
3697
3698class _Foo(Structure):
3699    _fields_ = [
3700        ('x', c_int),
3701        ('y', c_double),
3702        ('z', c_longlong,)
3703        ]
3704
3705class _TestSharedCTypes(BaseTestCase):
3706
3707    ALLOWED_TYPES = ('processes',)
3708
3709    def setUp(self):
3710        if not HAS_SHAREDCTYPES:
3711            self.skipTest("requires multiprocessing.sharedctypes")
3712
3713    @classmethod
3714    def _double(cls, x, y, z, foo, arr, string):
3715        x.value *= 2
3716        y.value *= 2
3717        z.value *= 2
3718        foo.x *= 2
3719        foo.y *= 2
3720        string.value *= 2
3721        for i in range(len(arr)):
3722            arr[i] *= 2
3723
3724    def test_sharedctypes(self, lock=False):
3725        x = Value('i', 7, lock=lock)
3726        y = Value(c_double, 1.0/3.0, lock=lock)
3727        z = Value(c_longlong, 2 ** 33, lock=lock)
3728        foo = Value(_Foo, 3, 2, lock=lock)
3729        arr = self.Array('d', list(range(10)), lock=lock)
3730        string = self.Array('c', 20, lock=lock)
3731        string.value = latin('hello')
3732
3733        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
3734        p.daemon = True
3735        p.start()
3736        p.join()
3737
3738        self.assertEqual(x.value, 14)
3739        self.assertAlmostEqual(y.value, 2.0/3.0)
3740        self.assertEqual(z.value, 2 ** 34)
3741        self.assertEqual(foo.x, 6)
3742        self.assertAlmostEqual(foo.y, 4.0)
3743        for i in range(10):
3744            self.assertAlmostEqual(arr[i], i*2)
3745        self.assertEqual(string.value, latin('hellohello'))
3746
3747    def test_synchronize(self):
3748        self.test_sharedctypes(lock=True)
3749
3750    def test_copy(self):
3751        foo = _Foo(2, 5.0, 2 ** 33)
3752        bar = copy(foo)
3753        foo.x = 0
3754        foo.y = 0
3755        foo.z = 0
3756        self.assertEqual(bar.x, 2)
3757        self.assertAlmostEqual(bar.y, 5.0)
3758        self.assertEqual(bar.z, 2 ** 33)
3759
3760
3761@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3762@hashlib_helper.requires_hashdigest('md5')
3763class _TestSharedMemory(BaseTestCase):
3764
3765    ALLOWED_TYPES = ('processes',)
3766
3767    @staticmethod
3768    def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3769        if isinstance(shmem_name_or_obj, str):
3770            local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3771        else:
3772            local_sms = shmem_name_or_obj
3773        local_sms.buf[:len(binary_data)] = binary_data
3774        local_sms.close()
3775
3776    def _new_shm_name(self, prefix):
3777        # Add a PID to the name of a POSIX shared memory object to allow
3778        # running multiprocessing tests (test_multiprocessing_fork,
3779        # test_multiprocessing_spawn, etc) in parallel.
3780        return prefix + str(os.getpid())
3781
3782    def test_shared_memory_basics(self):
3783        name_tsmb = self._new_shm_name('test01_tsmb')
3784        sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512)
3785        self.addCleanup(sms.unlink)
3786
3787        # Verify attributes are readable.
3788        self.assertEqual(sms.name, name_tsmb)
3789        self.assertGreaterEqual(sms.size, 512)
3790        self.assertGreaterEqual(len(sms.buf), sms.size)
3791
3792        # Verify __repr__
3793        self.assertIn(sms.name, str(sms))
3794        self.assertIn(str(sms.size), str(sms))
3795
3796        # Modify contents of shared memory segment through memoryview.
3797        sms.buf[0] = 42
3798        self.assertEqual(sms.buf[0], 42)
3799
3800        # Attach to existing shared memory segment.
3801        also_sms = shared_memory.SharedMemory(name_tsmb)
3802        self.assertEqual(also_sms.buf[0], 42)
3803        also_sms.close()
3804
3805        # Attach to existing shared memory segment but specify a new size.
3806        same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size)
3807        self.assertLess(same_sms.size, 20*sms.size)  # Size was ignored.
3808        same_sms.close()
3809
3810        # Creating Shared Memory Segment with -ve size
3811        with self.assertRaises(ValueError):
3812            shared_memory.SharedMemory(create=True, size=-2)
3813
3814        # Attaching Shared Memory Segment without a name
3815        with self.assertRaises(ValueError):
3816            shared_memory.SharedMemory(create=False)
3817
3818        # Test if shared memory segment is created properly,
3819        # when _make_filename returns an existing shared memory segment name
3820        with unittest.mock.patch(
3821            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
3822
3823            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
3824            names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')]
3825            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
3826            # because some POSIX compliant systems require name to start with /
3827            names = [NAME_PREFIX + name for name in names]
3828
3829            mock_make_filename.side_effect = names
3830            shm1 = shared_memory.SharedMemory(create=True, size=1)
3831            self.addCleanup(shm1.unlink)
3832            self.assertEqual(shm1._name, names[0])
3833
3834            mock_make_filename.side_effect = names
3835            shm2 = shared_memory.SharedMemory(create=True, size=1)
3836            self.addCleanup(shm2.unlink)
3837            self.assertEqual(shm2._name, names[1])
3838
3839        if shared_memory._USE_POSIX:
3840            # Posix Shared Memory can only be unlinked once.  Here we
3841            # test an implementation detail that is not observed across
3842            # all supported platforms (since WindowsNamedSharedMemory
3843            # manages unlinking on its own and unlink() does nothing).
3844            # True release of shared memory segment does not necessarily
3845            # happen until process exits, depending on the OS platform.
3846            name_dblunlink = self._new_shm_name('test01_dblunlink')
3847            sms_uno = shared_memory.SharedMemory(
3848                name_dblunlink,
3849                create=True,
3850                size=5000
3851            )
3852            with self.assertRaises(FileNotFoundError):
3853                try:
3854                    self.assertGreaterEqual(sms_uno.size, 5000)
3855
3856                    sms_duo = shared_memory.SharedMemory(name_dblunlink)
3857                    sms_duo.unlink()  # First shm_unlink() call.
3858                    sms_duo.close()
3859                    sms_uno.close()
3860
3861                finally:
3862                    sms_uno.unlink()  # A second shm_unlink() call is bad.
3863
3864        with self.assertRaises(FileExistsError):
3865            # Attempting to create a new shared memory segment with a
3866            # name that is already in use triggers an exception.
3867            there_can_only_be_one_sms = shared_memory.SharedMemory(
3868                name_tsmb,
3869                create=True,
3870                size=512
3871            )
3872
3873        if shared_memory._USE_POSIX:
3874            # Requesting creation of a shared memory segment with the option
3875            # to attach to an existing segment, if that name is currently in
3876            # use, should not trigger an exception.
3877            # Note:  Using a smaller size could possibly cause truncation of
3878            # the existing segment but is OS platform dependent.  In the
3879            # case of MacOS/darwin, requesting a smaller size is disallowed.
3880            class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3881                _flags = os.O_CREAT | os.O_RDWR
3882            ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb)
3883            self.assertEqual(ok_if_exists_sms.size, sms.size)
3884            ok_if_exists_sms.close()
3885
3886        # Attempting to attach to an existing shared memory segment when
3887        # no segment exists with the supplied name triggers an exception.
3888        with self.assertRaises(FileNotFoundError):
3889            nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3890            nonexisting_sms.unlink()  # Error should occur on prior line.
3891
3892        sms.close()
3893
3894    def test_shared_memory_recreate(self):
3895        # Test if shared memory segment is created properly,
3896        # when _make_filename returns an existing shared memory segment name
3897        with unittest.mock.patch(
3898            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
3899
3900            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
3901            names = ['test01_fn', 'test02_fn']
3902            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
3903            # because some POSIX compliant systems require name to start with /
3904            names = [NAME_PREFIX + name for name in names]
3905
3906            mock_make_filename.side_effect = names
3907            shm1 = shared_memory.SharedMemory(create=True, size=1)
3908            self.addCleanup(shm1.unlink)
3909            self.assertEqual(shm1._name, names[0])
3910
3911            mock_make_filename.side_effect = names
3912            shm2 = shared_memory.SharedMemory(create=True, size=1)
3913            self.addCleanup(shm2.unlink)
3914            self.assertEqual(shm2._name, names[1])
3915
3916    def test_invalid_shared_memory_cration(self):
3917        # Test creating a shared memory segment with negative size
3918        with self.assertRaises(ValueError):
3919            sms_invalid = shared_memory.SharedMemory(create=True, size=-1)
3920
3921        # Test creating a shared memory segment with size 0
3922        with self.assertRaises(ValueError):
3923            sms_invalid = shared_memory.SharedMemory(create=True, size=0)
3924
3925        # Test creating a shared memory segment without size argument
3926        with self.assertRaises(ValueError):
3927            sms_invalid = shared_memory.SharedMemory(create=True)
3928
3929    def test_shared_memory_pickle_unpickle(self):
3930        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
3931            with self.subTest(proto=proto):
3932                sms = shared_memory.SharedMemory(create=True, size=512)
3933                self.addCleanup(sms.unlink)
3934                sms.buf[0:6] = b'pickle'
3935
3936                # Test pickling
3937                pickled_sms = pickle.dumps(sms, protocol=proto)
3938
3939                # Test unpickling
3940                sms2 = pickle.loads(pickled_sms)
3941                self.assertIsInstance(sms2, shared_memory.SharedMemory)
3942                self.assertEqual(sms.name, sms2.name)
3943                self.assertEqual(bytes(sms.buf[0:6]), b'pickle')
3944                self.assertEqual(bytes(sms2.buf[0:6]), b'pickle')
3945
3946                # Test that unpickled version is still the same SharedMemory
3947                sms.buf[0:6] = b'newval'
3948                self.assertEqual(bytes(sms.buf[0:6]), b'newval')
3949                self.assertEqual(bytes(sms2.buf[0:6]), b'newval')
3950
3951                sms2.buf[0:6] = b'oldval'
3952                self.assertEqual(bytes(sms.buf[0:6]), b'oldval')
3953                self.assertEqual(bytes(sms2.buf[0:6]), b'oldval')
3954
3955    def test_shared_memory_pickle_unpickle_dead_object(self):
3956        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
3957            with self.subTest(proto=proto):
3958                sms = shared_memory.SharedMemory(create=True, size=512)
3959                sms.buf[0:6] = b'pickle'
3960                pickled_sms = pickle.dumps(sms, protocol=proto)
3961
3962                # Now, we are going to kill the original object.
3963                # So, unpickled one won't be able to attach to it.
3964                sms.close()
3965                sms.unlink()
3966
3967                with self.assertRaises(FileNotFoundError):
3968                    pickle.loads(pickled_sms)
3969
3970    def test_shared_memory_across_processes(self):
3971        # bpo-40135: don't define shared memory block's name in case of
3972        # the failure when we run multiprocessing tests in parallel.
3973        sms = shared_memory.SharedMemory(create=True, size=512)
3974        self.addCleanup(sms.unlink)
3975
3976        # Verify remote attachment to existing block by name is working.
3977        p = self.Process(
3978            target=self._attach_existing_shmem_then_write,
3979            args=(sms.name, b'howdy')
3980        )
3981        p.daemon = True
3982        p.start()
3983        p.join()
3984        self.assertEqual(bytes(sms.buf[:5]), b'howdy')
3985
3986        # Verify pickling of SharedMemory instance also works.
3987        p = self.Process(
3988            target=self._attach_existing_shmem_then_write,
3989            args=(sms, b'HELLO')
3990        )
3991        p.daemon = True
3992        p.start()
3993        p.join()
3994        self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
3995
3996        sms.close()
3997
3998    @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
3999    def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
4000        # bpo-36368: protect SharedMemoryManager server process from
4001        # KeyboardInterrupt signals.
4002        smm = multiprocessing.managers.SharedMemoryManager()
4003        smm.start()
4004
4005        # make sure the manager works properly at the beginning
4006        sl = smm.ShareableList(range(10))
4007
4008        # the manager's server should ignore KeyboardInterrupt signals, and
4009        # maintain its connection with the current process, and success when
4010        # asked to deliver memory segments.
4011        os.kill(smm._process.pid, signal.SIGINT)
4012
4013        sl2 = smm.ShareableList(range(10))
4014
4015        # test that the custom signal handler registered in the Manager does
4016        # not affect signal handling in the parent process.
4017        with self.assertRaises(KeyboardInterrupt):
4018            os.kill(os.getpid(), signal.SIGINT)
4019
4020        smm.shutdown()
4021
4022    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4023    def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
4024        # bpo-36867: test that a SharedMemoryManager uses the
4025        # same resource_tracker process as its parent.
4026        cmd = '''if 1:
4027            from multiprocessing.managers import SharedMemoryManager
4028
4029
4030            smm = SharedMemoryManager()
4031            smm.start()
4032            sl = smm.ShareableList(range(10))
4033            smm.shutdown()
4034        '''
4035        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
4036
4037        # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
4038        # resource_tracker process as its parent would make the parent's
4039        # tracker complain about sl being leaked even though smm.shutdown()
4040        # properly released sl.
4041        self.assertFalse(err)
4042
4043    def test_shared_memory_SharedMemoryManager_basics(self):
4044        smm1 = multiprocessing.managers.SharedMemoryManager()
4045        with self.assertRaises(ValueError):
4046            smm1.SharedMemory(size=9)  # Fails if SharedMemoryServer not started
4047        smm1.start()
4048        lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
4049        lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
4050        doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
4051        self.assertEqual(len(doppleganger_list0), 5)
4052        doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
4053        self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
4054        held_name = lom[0].name
4055        smm1.shutdown()
4056        if sys.platform != "win32":
4057            # Calls to unlink() have no effect on Windows platform; shared
4058            # memory will only be released once final process exits.
4059            with self.assertRaises(FileNotFoundError):
4060                # No longer there to be attached to again.
4061                absent_shm = shared_memory.SharedMemory(name=held_name)
4062
4063        with multiprocessing.managers.SharedMemoryManager() as smm2:
4064            sl = smm2.ShareableList("howdy")
4065            shm = smm2.SharedMemory(size=128)
4066            held_name = sl.shm.name
4067        if sys.platform != "win32":
4068            with self.assertRaises(FileNotFoundError):
4069                # No longer there to be attached to again.
4070                absent_sl = shared_memory.ShareableList(name=held_name)
4071
4072
4073    def test_shared_memory_ShareableList_basics(self):
4074        sl = shared_memory.ShareableList(
4075            ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
4076        )
4077        self.addCleanup(sl.shm.unlink)
4078
4079        # Verify __repr__
4080        self.assertIn(sl.shm.name, str(sl))
4081        self.assertIn(str(list(sl)), str(sl))
4082
4083        # Index Out of Range (get)
4084        with self.assertRaises(IndexError):
4085            sl[7]
4086
4087        # Index Out of Range (set)
4088        with self.assertRaises(IndexError):
4089            sl[7] = 2
4090
4091        # Assign value without format change (str -> str)
4092        current_format = sl._get_packing_format(0)
4093        sl[0] = 'howdy'
4094        self.assertEqual(current_format, sl._get_packing_format(0))
4095
4096        # Verify attributes are readable.
4097        self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
4098
4099        # Exercise len().
4100        self.assertEqual(len(sl), 7)
4101
4102        # Exercise index().
4103        with warnings.catch_warnings():
4104            # Suppress BytesWarning when comparing against b'HoWdY'.
4105            warnings.simplefilter('ignore')
4106            with self.assertRaises(ValueError):
4107                sl.index('100')
4108            self.assertEqual(sl.index(100), 3)
4109
4110        # Exercise retrieving individual values.
4111        self.assertEqual(sl[0], 'howdy')
4112        self.assertEqual(sl[-2], True)
4113
4114        # Exercise iterability.
4115        self.assertEqual(
4116            tuple(sl),
4117            ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
4118        )
4119
4120        # Exercise modifying individual values.
4121        sl[3] = 42
4122        self.assertEqual(sl[3], 42)
4123        sl[4] = 'some'  # Change type at a given position.
4124        self.assertEqual(sl[4], 'some')
4125        self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
4126        with self.assertRaisesRegex(ValueError,
4127                                    "exceeds available storage"):
4128            sl[4] = 'far too many'
4129        self.assertEqual(sl[4], 'some')
4130        sl[0] = 'encodés'  # Exactly 8 bytes of UTF-8 data
4131        self.assertEqual(sl[0], 'encodés')
4132        self.assertEqual(sl[1], b'HoWdY')  # no spillage
4133        with self.assertRaisesRegex(ValueError,
4134                                    "exceeds available storage"):
4135            sl[0] = 'encodées'  # Exactly 9 bytes of UTF-8 data
4136        self.assertEqual(sl[1], b'HoWdY')
4137        with self.assertRaisesRegex(ValueError,
4138                                    "exceeds available storage"):
4139            sl[1] = b'123456789'
4140        self.assertEqual(sl[1], b'HoWdY')
4141
4142        # Exercise count().
4143        with warnings.catch_warnings():
4144            # Suppress BytesWarning when comparing against b'HoWdY'.
4145            warnings.simplefilter('ignore')
4146            self.assertEqual(sl.count(42), 2)
4147            self.assertEqual(sl.count(b'HoWdY'), 1)
4148            self.assertEqual(sl.count(b'adios'), 0)
4149
4150        # Exercise creating a duplicate.
4151        name_duplicate = self._new_shm_name('test03_duplicate')
4152        sl_copy = shared_memory.ShareableList(sl, name=name_duplicate)
4153        try:
4154            self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
4155            self.assertEqual(name_duplicate, sl_copy.shm.name)
4156            self.assertEqual(list(sl), list(sl_copy))
4157            self.assertEqual(sl.format, sl_copy.format)
4158            sl_copy[-1] = 77
4159            self.assertEqual(sl_copy[-1], 77)
4160            self.assertNotEqual(sl[-1], 77)
4161            sl_copy.shm.close()
4162        finally:
4163            sl_copy.shm.unlink()
4164
4165        # Obtain a second handle on the same ShareableList.
4166        sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
4167        self.assertEqual(sl.shm.name, sl_tethered.shm.name)
4168        sl_tethered[-1] = 880
4169        self.assertEqual(sl[-1], 880)
4170        sl_tethered.shm.close()
4171
4172        sl.shm.close()
4173
4174        # Exercise creating an empty ShareableList.
4175        empty_sl = shared_memory.ShareableList()
4176        try:
4177            self.assertEqual(len(empty_sl), 0)
4178            self.assertEqual(empty_sl.format, '')
4179            self.assertEqual(empty_sl.count('any'), 0)
4180            with self.assertRaises(ValueError):
4181                empty_sl.index(None)
4182            empty_sl.shm.close()
4183        finally:
4184            empty_sl.shm.unlink()
4185
4186    def test_shared_memory_ShareableList_pickling(self):
4187        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4188            with self.subTest(proto=proto):
4189                sl = shared_memory.ShareableList(range(10))
4190                self.addCleanup(sl.shm.unlink)
4191
4192                serialized_sl = pickle.dumps(sl, protocol=proto)
4193                deserialized_sl = pickle.loads(serialized_sl)
4194                self.assertIsInstance(
4195                    deserialized_sl, shared_memory.ShareableList)
4196                self.assertEqual(deserialized_sl[-1], 9)
4197                self.assertIsNot(sl, deserialized_sl)
4198
4199                deserialized_sl[4] = "changed"
4200                self.assertEqual(sl[4], "changed")
4201                sl[3] = "newvalue"
4202                self.assertEqual(deserialized_sl[3], "newvalue")
4203
4204                larger_sl = shared_memory.ShareableList(range(400))
4205                self.addCleanup(larger_sl.shm.unlink)
4206                serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto)
4207                self.assertEqual(len(serialized_sl), len(serialized_larger_sl))
4208                larger_sl.shm.close()
4209
4210                deserialized_sl.shm.close()
4211                sl.shm.close()
4212
4213    def test_shared_memory_ShareableList_pickling_dead_object(self):
4214        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4215            with self.subTest(proto=proto):
4216                sl = shared_memory.ShareableList(range(10))
4217                serialized_sl = pickle.dumps(sl, protocol=proto)
4218
4219                # Now, we are going to kill the original object.
4220                # So, unpickled one won't be able to attach to it.
4221                sl.shm.close()
4222                sl.shm.unlink()
4223
4224                with self.assertRaises(FileNotFoundError):
4225                    pickle.loads(serialized_sl)
4226
4227    def test_shared_memory_cleaned_after_process_termination(self):
4228        cmd = '''if 1:
4229            import os, time, sys
4230            from multiprocessing import shared_memory
4231
4232            # Create a shared_memory segment, and send the segment name
4233            sm = shared_memory.SharedMemory(create=True, size=10)
4234            sys.stdout.write(sm.name + '\\n')
4235            sys.stdout.flush()
4236            time.sleep(100)
4237        '''
4238        with subprocess.Popen([sys.executable, '-E', '-c', cmd],
4239                              stdout=subprocess.PIPE,
4240                              stderr=subprocess.PIPE) as p:
4241            name = p.stdout.readline().strip().decode()
4242
4243            # killing abruptly processes holding reference to a shared memory
4244            # segment should not leak the given memory segment.
4245            p.terminate()
4246            p.wait()
4247
4248            deadline = time.monotonic() + support.LONG_TIMEOUT
4249            t = 0.1
4250            while time.monotonic() < deadline:
4251                time.sleep(t)
4252                t = min(t*2, 5)
4253                try:
4254                    smm = shared_memory.SharedMemory(name, create=False)
4255                except FileNotFoundError:
4256                    break
4257            else:
4258                raise AssertionError("A SharedMemory segment was leaked after"
4259                                     " a process was abruptly terminated.")
4260
4261            if os.name == 'posix':
4262                # Without this line it was raising warnings like:
4263                #   UserWarning: resource_tracker:
4264                #   There appear to be 1 leaked shared_memory
4265                #   objects to clean up at shutdown
4266                # See: https://bugs.python.org/issue45209
4267                resource_tracker.unregister(f"/{name}", "shared_memory")
4268
4269                # A warning was emitted by the subprocess' own
4270                # resource_tracker (on Windows, shared memory segments
4271                # are released automatically by the OS).
4272                err = p.stderr.read().decode()
4273                self.assertIn(
4274                    "resource_tracker: There appear to be 1 leaked "
4275                    "shared_memory objects to clean up at shutdown", err)
4276
4277#
4278# Test to verify that `Finalize` works.
4279#
4280
4281class _TestFinalize(BaseTestCase):
4282
4283    ALLOWED_TYPES = ('processes',)
4284
4285    def setUp(self):
4286        self.registry_backup = util._finalizer_registry.copy()
4287        util._finalizer_registry.clear()
4288
4289    def tearDown(self):
4290        gc.collect()  # For PyPy or other GCs.
4291        self.assertFalse(util._finalizer_registry)
4292        util._finalizer_registry.update(self.registry_backup)
4293
4294    @classmethod
4295    def _test_finalize(cls, conn):
4296        class Foo(object):
4297            pass
4298
4299        a = Foo()
4300        util.Finalize(a, conn.send, args=('a',))
4301        del a           # triggers callback for a
4302        gc.collect()  # For PyPy or other GCs.
4303
4304        b = Foo()
4305        close_b = util.Finalize(b, conn.send, args=('b',))
4306        close_b()       # triggers callback for b
4307        close_b()       # does nothing because callback has already been called
4308        del b           # does nothing because callback has already been called
4309        gc.collect()  # For PyPy or other GCs.
4310
4311        c = Foo()
4312        util.Finalize(c, conn.send, args=('c',))
4313
4314        d10 = Foo()
4315        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4316
4317        d01 = Foo()
4318        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4319        d02 = Foo()
4320        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4321        d03 = Foo()
4322        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4323
4324        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4325
4326        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4327
4328        # call multiprocessing's cleanup function then exit process without
4329        # garbage collecting locals
4330        util._exit_function()
4331        conn.close()
4332        os._exit(0)
4333
4334    def test_finalize(self):
4335        conn, child_conn = self.Pipe()
4336
4337        p = self.Process(target=self._test_finalize, args=(child_conn,))
4338        p.daemon = True
4339        p.start()
4340        p.join()
4341
4342        result = [obj for obj in iter(conn.recv, 'STOP')]
4343        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4344
4345    def test_thread_safety(self):
4346        # bpo-24484: _run_finalizers() should be thread-safe
4347        def cb():
4348            pass
4349
4350        class Foo(object):
4351            def __init__(self):
4352                self.ref = self  # create reference cycle
4353                # insert finalizer at random key
4354                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4355
4356        finish = False
4357        exc = None
4358
4359        def run_finalizers():
4360            nonlocal exc
4361            while not finish:
4362                time.sleep(random.random() * 1e-1)
4363                try:
4364                    # A GC run will eventually happen during this,
4365                    # collecting stale Foo's and mutating the registry
4366                    util._run_finalizers()
4367                except Exception as e:
4368                    exc = e
4369
4370        def make_finalizers():
4371            nonlocal exc
4372            d = {}
4373            while not finish:
4374                try:
4375                    # Old Foo's get gradually replaced and later
4376                    # collected by the GC (because of the cyclic ref)
4377                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
4378                except Exception as e:
4379                    exc = e
4380                    d.clear()
4381
4382        old_interval = sys.getswitchinterval()
4383        old_threshold = gc.get_threshold()
4384        try:
4385            sys.setswitchinterval(1e-6)
4386            gc.set_threshold(5, 5, 5)
4387            threads = [threading.Thread(target=run_finalizers),
4388                       threading.Thread(target=make_finalizers)]
4389            with threading_helper.start_threads(threads):
4390                time.sleep(4.0)  # Wait a bit to trigger race condition
4391                finish = True
4392            if exc is not None:
4393                raise exc
4394        finally:
4395            sys.setswitchinterval(old_interval)
4396            gc.set_threshold(*old_threshold)
4397            gc.collect()  # Collect remaining Foo's
4398
4399
4400#
4401# Test that from ... import * works for each module
4402#
4403
4404class _TestImportStar(unittest.TestCase):
4405
4406    def get_module_names(self):
4407        import glob
4408        folder = os.path.dirname(multiprocessing.__file__)
4409        pattern = os.path.join(glob.escape(folder), '*.py')
4410        files = glob.glob(pattern)
4411        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4412        modules = ['multiprocessing.' + m for m in modules]
4413        modules.remove('multiprocessing.__init__')
4414        modules.append('multiprocessing')
4415        return modules
4416
4417    def test_import(self):
4418        modules = self.get_module_names()
4419        if sys.platform == 'win32':
4420            modules.remove('multiprocessing.popen_fork')
4421            modules.remove('multiprocessing.popen_forkserver')
4422            modules.remove('multiprocessing.popen_spawn_posix')
4423        else:
4424            modules.remove('multiprocessing.popen_spawn_win32')
4425            if not HAS_REDUCTION:
4426                modules.remove('multiprocessing.popen_forkserver')
4427
4428        if c_int is None:
4429            # This module requires _ctypes
4430            modules.remove('multiprocessing.sharedctypes')
4431
4432        for name in modules:
4433            __import__(name)
4434            mod = sys.modules[name]
4435            self.assertTrue(hasattr(mod, '__all__'), name)
4436
4437            for attr in mod.__all__:
4438                self.assertTrue(
4439                    hasattr(mod, attr),
4440                    '%r does not have attribute %r' % (mod, attr)
4441                    )
4442
4443#
4444# Quick test that logging works -- does not test logging output
4445#
4446
4447class _TestLogging(BaseTestCase):
4448
4449    ALLOWED_TYPES = ('processes',)
4450
4451    def test_enable_logging(self):
4452        logger = multiprocessing.get_logger()
4453        logger.setLevel(util.SUBWARNING)
4454        self.assertTrue(logger is not None)
4455        logger.debug('this will not be printed')
4456        logger.info('nor will this')
4457        logger.setLevel(LOG_LEVEL)
4458
4459    @classmethod
4460    def _test_level(cls, conn):
4461        logger = multiprocessing.get_logger()
4462        conn.send(logger.getEffectiveLevel())
4463
4464    def test_level(self):
4465        LEVEL1 = 32
4466        LEVEL2 = 37
4467
4468        logger = multiprocessing.get_logger()
4469        root_logger = logging.getLogger()
4470        root_level = root_logger.level
4471
4472        reader, writer = multiprocessing.Pipe(duplex=False)
4473
4474        logger.setLevel(LEVEL1)
4475        p = self.Process(target=self._test_level, args=(writer,))
4476        p.start()
4477        self.assertEqual(LEVEL1, reader.recv())
4478        p.join()
4479        p.close()
4480
4481        logger.setLevel(logging.NOTSET)
4482        root_logger.setLevel(LEVEL2)
4483        p = self.Process(target=self._test_level, args=(writer,))
4484        p.start()
4485        self.assertEqual(LEVEL2, reader.recv())
4486        p.join()
4487        p.close()
4488
4489        root_logger.setLevel(root_level)
4490        logger.setLevel(level=LOG_LEVEL)
4491
4492
4493# class _TestLoggingProcessName(BaseTestCase):
4494#
4495#     def handle(self, record):
4496#         assert record.processName == multiprocessing.current_process().name
4497#         self.__handled = True
4498#
4499#     def test_logging(self):
4500#         handler = logging.Handler()
4501#         handler.handle = self.handle
4502#         self.__handled = False
4503#         # Bypass getLogger() and side-effects
4504#         logger = logging.getLoggerClass()(
4505#                 'multiprocessing.test.TestLoggingProcessName')
4506#         logger.addHandler(handler)
4507#         logger.propagate = False
4508#
4509#         logger.warn('foo')
4510#         assert self.__handled
4511
4512#
4513# Check that Process.join() retries if os.waitpid() fails with EINTR
4514#
4515
4516class _TestPollEintr(BaseTestCase):
4517
4518    ALLOWED_TYPES = ('processes',)
4519
4520    @classmethod
4521    def _killer(cls, pid):
4522        time.sleep(0.1)
4523        os.kill(pid, signal.SIGUSR1)
4524
4525    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4526    def test_poll_eintr(self):
4527        got_signal = [False]
4528        def record(*args):
4529            got_signal[0] = True
4530        pid = os.getpid()
4531        oldhandler = signal.signal(signal.SIGUSR1, record)
4532        try:
4533            killer = self.Process(target=self._killer, args=(pid,))
4534            killer.start()
4535            try:
4536                p = self.Process(target=time.sleep, args=(2,))
4537                p.start()
4538                p.join()
4539            finally:
4540                killer.join()
4541            self.assertTrue(got_signal[0])
4542            self.assertEqual(p.exitcode, 0)
4543        finally:
4544            signal.signal(signal.SIGUSR1, oldhandler)
4545
4546#
4547# Test to verify handle verification, see issue 3321
4548#
4549
4550class TestInvalidHandle(unittest.TestCase):
4551
4552    @unittest.skipIf(WIN32, "skipped on Windows")
4553    def test_invalid_handles(self):
4554        conn = multiprocessing.connection.Connection(44977608)
4555        # check that poll() doesn't crash
4556        try:
4557            conn.poll()
4558        except (ValueError, OSError):
4559            pass
4560        finally:
4561            # Hack private attribute _handle to avoid printing an error
4562            # in conn.__del__
4563            conn._handle = None
4564        self.assertRaises((ValueError, OSError),
4565                          multiprocessing.connection.Connection, -1)
4566
4567
4568
4569@hashlib_helper.requires_hashdigest('md5')
4570class OtherTest(unittest.TestCase):
4571    # TODO: add more tests for deliver/answer challenge.
4572    def test_deliver_challenge_auth_failure(self):
4573        class _FakeConnection(object):
4574            def recv_bytes(self, size):
4575                return b'something bogus'
4576            def send_bytes(self, data):
4577                pass
4578        self.assertRaises(multiprocessing.AuthenticationError,
4579                          multiprocessing.connection.deliver_challenge,
4580                          _FakeConnection(), b'abc')
4581
4582    def test_answer_challenge_auth_failure(self):
4583        class _FakeConnection(object):
4584            def __init__(self):
4585                self.count = 0
4586            def recv_bytes(self, size):
4587                self.count += 1
4588                if self.count == 1:
4589                    return multiprocessing.connection.CHALLENGE
4590                elif self.count == 2:
4591                    return b'something bogus'
4592                return b''
4593            def send_bytes(self, data):
4594                pass
4595        self.assertRaises(multiprocessing.AuthenticationError,
4596                          multiprocessing.connection.answer_challenge,
4597                          _FakeConnection(), b'abc')
4598
4599#
4600# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4601#
4602
4603def initializer(ns):
4604    ns.test += 1
4605
4606@hashlib_helper.requires_hashdigest('md5')
4607class TestInitializers(unittest.TestCase):
4608    def setUp(self):
4609        self.mgr = multiprocessing.Manager()
4610        self.ns = self.mgr.Namespace()
4611        self.ns.test = 0
4612
4613    def tearDown(self):
4614        self.mgr.shutdown()
4615        self.mgr.join()
4616
4617    def test_manager_initializer(self):
4618        m = multiprocessing.managers.SyncManager()
4619        self.assertRaises(TypeError, m.start, 1)
4620        m.start(initializer, (self.ns,))
4621        self.assertEqual(self.ns.test, 1)
4622        m.shutdown()
4623        m.join()
4624
4625    def test_pool_initializer(self):
4626        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4627        p = multiprocessing.Pool(1, initializer, (self.ns,))
4628        p.close()
4629        p.join()
4630        self.assertEqual(self.ns.test, 1)
4631
4632#
4633# Issue 5155, 5313, 5331: Test process in processes
4634# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4635#
4636
4637def _this_sub_process(q):
4638    try:
4639        item = q.get(block=False)
4640    except pyqueue.Empty:
4641        pass
4642
4643def _test_process():
4644    queue = multiprocessing.Queue()
4645    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4646    subProc.daemon = True
4647    subProc.start()
4648    subProc.join()
4649
4650def _afunc(x):
4651    return x*x
4652
4653def pool_in_process():
4654    pool = multiprocessing.Pool(processes=4)
4655    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
4656    pool.close()
4657    pool.join()
4658
4659class _file_like(object):
4660    def __init__(self, delegate):
4661        self._delegate = delegate
4662        self._pid = None
4663
4664    @property
4665    def cache(self):
4666        pid = os.getpid()
4667        # There are no race conditions since fork keeps only the running thread
4668        if pid != self._pid:
4669            self._pid = pid
4670            self._cache = []
4671        return self._cache
4672
4673    def write(self, data):
4674        self.cache.append(data)
4675
4676    def flush(self):
4677        self._delegate.write(''.join(self.cache))
4678        self._cache = []
4679
4680class TestStdinBadfiledescriptor(unittest.TestCase):
4681
4682    def test_queue_in_process(self):
4683        proc = multiprocessing.Process(target=_test_process)
4684        proc.start()
4685        proc.join()
4686
4687    def test_pool_in_process(self):
4688        p = multiprocessing.Process(target=pool_in_process)
4689        p.start()
4690        p.join()
4691
4692    def test_flushing(self):
4693        sio = io.StringIO()
4694        flike = _file_like(sio)
4695        flike.write('foo')
4696        proc = multiprocessing.Process(target=lambda: flike.flush())
4697        flike.flush()
4698        assert sio.getvalue() == 'foo'
4699
4700
4701class TestWait(unittest.TestCase):
4702
4703    @classmethod
4704    def _child_test_wait(cls, w, slow):
4705        for i in range(10):
4706            if slow:
4707                time.sleep(random.random()*0.1)
4708            w.send((i, os.getpid()))
4709        w.close()
4710
4711    def test_wait(self, slow=False):
4712        from multiprocessing.connection import wait
4713        readers = []
4714        procs = []
4715        messages = []
4716
4717        for i in range(4):
4718            r, w = multiprocessing.Pipe(duplex=False)
4719            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
4720            p.daemon = True
4721            p.start()
4722            w.close()
4723            readers.append(r)
4724            procs.append(p)
4725            self.addCleanup(p.join)
4726
4727        while readers:
4728            for r in wait(readers):
4729                try:
4730                    msg = r.recv()
4731                except EOFError:
4732                    readers.remove(r)
4733                    r.close()
4734                else:
4735                    messages.append(msg)
4736
4737        messages.sort()
4738        expected = sorted((i, p.pid) for i in range(10) for p in procs)
4739        self.assertEqual(messages, expected)
4740
4741    @classmethod
4742    def _child_test_wait_socket(cls, address, slow):
4743        s = socket.socket()
4744        s.connect(address)
4745        for i in range(10):
4746            if slow:
4747                time.sleep(random.random()*0.1)
4748            s.sendall(('%s\n' % i).encode('ascii'))
4749        s.close()
4750
4751    def test_wait_socket(self, slow=False):
4752        from multiprocessing.connection import wait
4753        l = socket.create_server((socket_helper.HOST, 0))
4754        addr = l.getsockname()
4755        readers = []
4756        procs = []
4757        dic = {}
4758
4759        for i in range(4):
4760            p = multiprocessing.Process(target=self._child_test_wait_socket,
4761                                        args=(addr, slow))
4762            p.daemon = True
4763            p.start()
4764            procs.append(p)
4765            self.addCleanup(p.join)
4766
4767        for i in range(4):
4768            r, _ = l.accept()
4769            readers.append(r)
4770            dic[r] = []
4771        l.close()
4772
4773        while readers:
4774            for r in wait(readers):
4775                msg = r.recv(32)
4776                if not msg:
4777                    readers.remove(r)
4778                    r.close()
4779                else:
4780                    dic[r].append(msg)
4781
4782        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4783        for v in dic.values():
4784            self.assertEqual(b''.join(v), expected)
4785
4786    def test_wait_slow(self):
4787        self.test_wait(True)
4788
4789    def test_wait_socket_slow(self):
4790        self.test_wait_socket(True)
4791
4792    def test_wait_timeout(self):
4793        from multiprocessing.connection import wait
4794
4795        expected = 5
4796        a, b = multiprocessing.Pipe()
4797
4798        start = time.monotonic()
4799        res = wait([a, b], expected)
4800        delta = time.monotonic() - start
4801
4802        self.assertEqual(res, [])
4803        self.assertLess(delta, expected * 2)
4804        self.assertGreater(delta, expected * 0.5)
4805
4806        b.send(None)
4807
4808        start = time.monotonic()
4809        res = wait([a, b], 20)
4810        delta = time.monotonic() - start
4811
4812        self.assertEqual(res, [a])
4813        self.assertLess(delta, 0.4)
4814
4815    @classmethod
4816    def signal_and_sleep(cls, sem, period):
4817        sem.release()
4818        time.sleep(period)
4819
4820    def test_wait_integer(self):
4821        from multiprocessing.connection import wait
4822
4823        expected = 3
4824        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
4825        sem = multiprocessing.Semaphore(0)
4826        a, b = multiprocessing.Pipe()
4827        p = multiprocessing.Process(target=self.signal_and_sleep,
4828                                    args=(sem, expected))
4829
4830        p.start()
4831        self.assertIsInstance(p.sentinel, int)
4832        self.assertTrue(sem.acquire(timeout=20))
4833
4834        start = time.monotonic()
4835        res = wait([a, p.sentinel, b], expected + 20)
4836        delta = time.monotonic() - start
4837
4838        self.assertEqual(res, [p.sentinel])
4839        self.assertLess(delta, expected + 2)
4840        self.assertGreater(delta, expected - 2)
4841
4842        a.send(None)
4843
4844        start = time.monotonic()
4845        res = wait([a, p.sentinel, b], 20)
4846        delta = time.monotonic() - start
4847
4848        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
4849        self.assertLess(delta, 0.4)
4850
4851        b.send(None)
4852
4853        start = time.monotonic()
4854        res = wait([a, p.sentinel, b], 20)
4855        delta = time.monotonic() - start
4856
4857        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
4858        self.assertLess(delta, 0.4)
4859
4860        p.terminate()
4861        p.join()
4862
4863    def test_neg_timeout(self):
4864        from multiprocessing.connection import wait
4865        a, b = multiprocessing.Pipe()
4866        t = time.monotonic()
4867        res = wait([a], timeout=-1)
4868        t = time.monotonic() - t
4869        self.assertEqual(res, [])
4870        self.assertLess(t, 1)
4871        a.close()
4872        b.close()
4873
4874#
4875# Issue 14151: Test invalid family on invalid environment
4876#
4877
4878class TestInvalidFamily(unittest.TestCase):
4879
4880    @unittest.skipIf(WIN32, "skipped on Windows")
4881    def test_invalid_family(self):
4882        with self.assertRaises(ValueError):
4883            multiprocessing.connection.Listener(r'\\.\test')
4884
4885    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
4886    def test_invalid_family_win32(self):
4887        with self.assertRaises(ValueError):
4888            multiprocessing.connection.Listener('/var/test.pipe')
4889
4890#
4891# Issue 12098: check sys.flags of child matches that for parent
4892#
4893
4894class TestFlags(unittest.TestCase):
4895    @classmethod
4896    def run_in_grandchild(cls, conn):
4897        conn.send(tuple(sys.flags))
4898
4899    @classmethod
4900    def run_in_child(cls):
4901        import json
4902        r, w = multiprocessing.Pipe(duplex=False)
4903        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4904        p.start()
4905        grandchild_flags = r.recv()
4906        p.join()
4907        r.close()
4908        w.close()
4909        flags = (tuple(sys.flags), grandchild_flags)
4910        print(json.dumps(flags))
4911
4912    def test_flags(self):
4913        import json
4914        # start child process using unusual flags
4915        prog = ('from test._test_multiprocessing import TestFlags; ' +
4916                'TestFlags.run_in_child()')
4917        data = subprocess.check_output(
4918            [sys.executable, '-E', '-S', '-O', '-c', prog])
4919        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4920        self.assertEqual(child_flags, grandchild_flags)
4921
4922#
4923# Test interaction with socket timeouts - see Issue #6056
4924#
4925
4926class TestTimeouts(unittest.TestCase):
4927    @classmethod
4928    def _test_timeout(cls, child, address):
4929        time.sleep(1)
4930        child.send(123)
4931        child.close()
4932        conn = multiprocessing.connection.Client(address)
4933        conn.send(456)
4934        conn.close()
4935
4936    def test_timeout(self):
4937        old_timeout = socket.getdefaulttimeout()
4938        try:
4939            socket.setdefaulttimeout(0.1)
4940            parent, child = multiprocessing.Pipe(duplex=True)
4941            l = multiprocessing.connection.Listener(family='AF_INET')
4942            p = multiprocessing.Process(target=self._test_timeout,
4943                                        args=(child, l.address))
4944            p.start()
4945            child.close()
4946            self.assertEqual(parent.recv(), 123)
4947            parent.close()
4948            conn = l.accept()
4949            self.assertEqual(conn.recv(), 456)
4950            conn.close()
4951            l.close()
4952            join_process(p)
4953        finally:
4954            socket.setdefaulttimeout(old_timeout)
4955
4956#
4957# Test what happens with no "if __name__ == '__main__'"
4958#
4959
4960class TestNoForkBomb(unittest.TestCase):
4961    def test_noforkbomb(self):
4962        sm = multiprocessing.get_start_method()
4963        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
4964        if sm != 'fork':
4965            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
4966            self.assertEqual(out, b'')
4967            self.assertIn(b'RuntimeError', err)
4968        else:
4969            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
4970            self.assertEqual(out.rstrip(), b'123')
4971            self.assertEqual(err, b'')
4972
4973#
4974# Issue #17555: ForkAwareThreadLock
4975#
4976
4977class TestForkAwareThreadLock(unittest.TestCase):
4978    # We recursively start processes.  Issue #17555 meant that the
4979    # after fork registry would get duplicate entries for the same
4980    # lock.  The size of the registry at generation n was ~2**n.
4981
4982    @classmethod
4983    def child(cls, n, conn):
4984        if n > 1:
4985            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4986            p.start()
4987            conn.close()
4988            join_process(p)
4989        else:
4990            conn.send(len(util._afterfork_registry))
4991        conn.close()
4992
4993    def test_lock(self):
4994        r, w = multiprocessing.Pipe(False)
4995        l = util.ForkAwareThreadLock()
4996        old_size = len(util._afterfork_registry)
4997        p = multiprocessing.Process(target=self.child, args=(5, w))
4998        p.start()
4999        w.close()
5000        new_size = r.recv()
5001        join_process(p)
5002        self.assertLessEqual(new_size, old_size)
5003
5004#
5005# Check that non-forked child processes do not inherit unneeded fds/handles
5006#
5007
5008class TestCloseFds(unittest.TestCase):
5009
5010    def get_high_socket_fd(self):
5011        if WIN32:
5012            # The child process will not have any socket handles, so
5013            # calling socket.fromfd() should produce WSAENOTSOCK even
5014            # if there is a handle of the same number.
5015            return socket.socket().detach()
5016        else:
5017            # We want to produce a socket with an fd high enough that a
5018            # freshly created child process will not have any fds as high.
5019            fd = socket.socket().detach()
5020            to_close = []
5021            while fd < 50:
5022                to_close.append(fd)
5023                fd = os.dup(fd)
5024            for x in to_close:
5025                os.close(x)
5026            return fd
5027
5028    def close(self, fd):
5029        if WIN32:
5030            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
5031        else:
5032            os.close(fd)
5033
5034    @classmethod
5035    def _test_closefds(cls, conn, fd):
5036        try:
5037            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
5038        except Exception as e:
5039            conn.send(e)
5040        else:
5041            s.close()
5042            conn.send(None)
5043
5044    def test_closefd(self):
5045        if not HAS_REDUCTION:
5046            raise unittest.SkipTest('requires fd pickling')
5047
5048        reader, writer = multiprocessing.Pipe()
5049        fd = self.get_high_socket_fd()
5050        try:
5051            p = multiprocessing.Process(target=self._test_closefds,
5052                                        args=(writer, fd))
5053            p.start()
5054            writer.close()
5055            e = reader.recv()
5056            join_process(p)
5057        finally:
5058            self.close(fd)
5059            writer.close()
5060            reader.close()
5061
5062        if multiprocessing.get_start_method() == 'fork':
5063            self.assertIs(e, None)
5064        else:
5065            WSAENOTSOCK = 10038
5066            self.assertIsInstance(e, OSError)
5067            self.assertTrue(e.errno == errno.EBADF or
5068                            e.winerror == WSAENOTSOCK, e)
5069
5070#
5071# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
5072#
5073
5074class TestIgnoreEINTR(unittest.TestCase):
5075
5076    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
5077    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
5078
5079    @classmethod
5080    def _test_ignore(cls, conn):
5081        def handler(signum, frame):
5082            pass
5083        signal.signal(signal.SIGUSR1, handler)
5084        conn.send('ready')
5085        x = conn.recv()
5086        conn.send(x)
5087        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
5088
5089    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5090    def test_ignore(self):
5091        conn, child_conn = multiprocessing.Pipe()
5092        try:
5093            p = multiprocessing.Process(target=self._test_ignore,
5094                                        args=(child_conn,))
5095            p.daemon = True
5096            p.start()
5097            child_conn.close()
5098            self.assertEqual(conn.recv(), 'ready')
5099            time.sleep(0.1)
5100            os.kill(p.pid, signal.SIGUSR1)
5101            time.sleep(0.1)
5102            conn.send(1234)
5103            self.assertEqual(conn.recv(), 1234)
5104            time.sleep(0.1)
5105            os.kill(p.pid, signal.SIGUSR1)
5106            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
5107            time.sleep(0.1)
5108            p.join()
5109        finally:
5110            conn.close()
5111
5112    @classmethod
5113    def _test_ignore_listener(cls, conn):
5114        def handler(signum, frame):
5115            pass
5116        signal.signal(signal.SIGUSR1, handler)
5117        with multiprocessing.connection.Listener() as l:
5118            conn.send(l.address)
5119            a = l.accept()
5120            a.send('welcome')
5121
5122    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5123    def test_ignore_listener(self):
5124        conn, child_conn = multiprocessing.Pipe()
5125        try:
5126            p = multiprocessing.Process(target=self._test_ignore_listener,
5127                                        args=(child_conn,))
5128            p.daemon = True
5129            p.start()
5130            child_conn.close()
5131            address = conn.recv()
5132            time.sleep(0.1)
5133            os.kill(p.pid, signal.SIGUSR1)
5134            time.sleep(0.1)
5135            client = multiprocessing.connection.Client(address)
5136            self.assertEqual(client.recv(), 'welcome')
5137            p.join()
5138        finally:
5139            conn.close()
5140
5141class TestStartMethod(unittest.TestCase):
5142    @classmethod
5143    def _check_context(cls, conn):
5144        conn.send(multiprocessing.get_start_method())
5145
5146    def check_context(self, ctx):
5147        r, w = ctx.Pipe(duplex=False)
5148        p = ctx.Process(target=self._check_context, args=(w,))
5149        p.start()
5150        w.close()
5151        child_method = r.recv()
5152        r.close()
5153        p.join()
5154        self.assertEqual(child_method, ctx.get_start_method())
5155
5156    def test_context(self):
5157        for method in ('fork', 'spawn', 'forkserver'):
5158            try:
5159                ctx = multiprocessing.get_context(method)
5160            except ValueError:
5161                continue
5162            self.assertEqual(ctx.get_start_method(), method)
5163            self.assertIs(ctx.get_context(), ctx)
5164            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
5165            self.assertRaises(ValueError, ctx.set_start_method, None)
5166            self.check_context(ctx)
5167
5168    def test_set_get(self):
5169        multiprocessing.set_forkserver_preload(PRELOAD)
5170        count = 0
5171        old_method = multiprocessing.get_start_method()
5172        try:
5173            for method in ('fork', 'spawn', 'forkserver'):
5174                try:
5175                    multiprocessing.set_start_method(method, force=True)
5176                except ValueError:
5177                    continue
5178                self.assertEqual(multiprocessing.get_start_method(), method)
5179                ctx = multiprocessing.get_context()
5180                self.assertEqual(ctx.get_start_method(), method)
5181                self.assertTrue(type(ctx).__name__.lower().startswith(method))
5182                self.assertTrue(
5183                    ctx.Process.__name__.lower().startswith(method))
5184                self.check_context(multiprocessing)
5185                count += 1
5186        finally:
5187            multiprocessing.set_start_method(old_method, force=True)
5188        self.assertGreaterEqual(count, 1)
5189
5190    def test_get_all(self):
5191        methods = multiprocessing.get_all_start_methods()
5192        if sys.platform == 'win32':
5193            self.assertEqual(methods, ['spawn'])
5194        else:
5195            self.assertTrue(methods == ['fork', 'spawn'] or
5196                            methods == ['spawn', 'fork'] or
5197                            methods == ['fork', 'spawn', 'forkserver'] or
5198                            methods == ['spawn', 'fork', 'forkserver'])
5199
5200    def test_preload_resources(self):
5201        if multiprocessing.get_start_method() != 'forkserver':
5202            self.skipTest("test only relevant for 'forkserver' method")
5203        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
5204        rc, out, err = test.support.script_helper.assert_python_ok(name)
5205        out = out.decode()
5206        err = err.decode()
5207        if out.rstrip() != 'ok' or err != '':
5208            print(out)
5209            print(err)
5210            self.fail("failed spawning forkserver or grandchild")
5211
5212
5213@unittest.skipIf(sys.platform == "win32",
5214                 "test semantics don't make sense on Windows")
5215class TestResourceTracker(unittest.TestCase):
5216
5217    def test_resource_tracker(self):
5218        #
5219        # Check that killing process does not leak named semaphores
5220        #
5221        cmd = '''if 1:
5222            import time, os, tempfile
5223            import multiprocessing as mp
5224            from multiprocessing import resource_tracker
5225            from multiprocessing.shared_memory import SharedMemory
5226
5227            mp.set_start_method("spawn")
5228            rand = tempfile._RandomNameSequence()
5229
5230
5231            def create_and_register_resource(rtype):
5232                if rtype == "semaphore":
5233                    lock = mp.Lock()
5234                    return lock, lock._semlock.name
5235                elif rtype == "shared_memory":
5236                    sm = SharedMemory(create=True, size=10)
5237                    return sm, sm._name
5238                else:
5239                    raise ValueError(
5240                        "Resource type {{}} not understood".format(rtype))
5241
5242
5243            resource1, rname1 = create_and_register_resource("{rtype}")
5244            resource2, rname2 = create_and_register_resource("{rtype}")
5245
5246            os.write({w}, rname1.encode("ascii") + b"\\n")
5247            os.write({w}, rname2.encode("ascii") + b"\\n")
5248
5249            time.sleep(10)
5250        '''
5251        for rtype in resource_tracker._CLEANUP_FUNCS:
5252            with self.subTest(rtype=rtype):
5253                if rtype == "noop":
5254                    # Artefact resource type used by the resource_tracker
5255                    continue
5256                r, w = os.pipe()
5257                p = subprocess.Popen([sys.executable,
5258                                     '-E', '-c', cmd.format(w=w, rtype=rtype)],
5259                                     pass_fds=[w],
5260                                     stderr=subprocess.PIPE)
5261                os.close(w)
5262                with open(r, 'rb', closefd=True) as f:
5263                    name1 = f.readline().rstrip().decode('ascii')
5264                    name2 = f.readline().rstrip().decode('ascii')
5265                _resource_unlink(name1, rtype)
5266                p.terminate()
5267                p.wait()
5268
5269                deadline = time.monotonic() + support.LONG_TIMEOUT
5270                while time.monotonic() < deadline:
5271                    time.sleep(.5)
5272                    try:
5273                        _resource_unlink(name2, rtype)
5274                    except OSError as e:
5275                        # docs say it should be ENOENT, but OSX seems to give
5276                        # EINVAL
5277                        self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5278                        break
5279                else:
5280                    raise AssertionError(
5281                        f"A {rtype} resource was leaked after a process was "
5282                        f"abruptly terminated.")
5283                err = p.stderr.read().decode('utf-8')
5284                p.stderr.close()
5285                expected = ('resource_tracker: There appear to be 2 leaked {} '
5286                            'objects'.format(
5287                            rtype))
5288                self.assertRegex(err, expected)
5289                self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
5290
5291    def check_resource_tracker_death(self, signum, should_die):
5292        # bpo-31310: if the semaphore tracker process has died, it should
5293        # be restarted implicitly.
5294        from multiprocessing.resource_tracker import _resource_tracker
5295        pid = _resource_tracker._pid
5296        if pid is not None:
5297            os.kill(pid, signal.SIGKILL)
5298            support.wait_process(pid, exitcode=-signal.SIGKILL)
5299        with warnings.catch_warnings():
5300            warnings.simplefilter("ignore")
5301            _resource_tracker.ensure_running()
5302        pid = _resource_tracker._pid
5303
5304        os.kill(pid, signum)
5305        time.sleep(1.0)  # give it time to die
5306
5307        ctx = multiprocessing.get_context("spawn")
5308        with warnings.catch_warnings(record=True) as all_warn:
5309            warnings.simplefilter("always")
5310            sem = ctx.Semaphore()
5311            sem.acquire()
5312            sem.release()
5313            wr = weakref.ref(sem)
5314            # ensure `sem` gets collected, which triggers communication with
5315            # the semaphore tracker
5316            del sem
5317            gc.collect()
5318            self.assertIsNone(wr())
5319            if should_die:
5320                self.assertEqual(len(all_warn), 1)
5321                the_warn = all_warn[0]
5322                self.assertTrue(issubclass(the_warn.category, UserWarning))
5323                self.assertTrue("resource_tracker: process died"
5324                                in str(the_warn.message))
5325            else:
5326                self.assertEqual(len(all_warn), 0)
5327
5328    def test_resource_tracker_sigint(self):
5329        # Catchable signal (ignored by semaphore tracker)
5330        self.check_resource_tracker_death(signal.SIGINT, False)
5331
5332    def test_resource_tracker_sigterm(self):
5333        # Catchable signal (ignored by semaphore tracker)
5334        self.check_resource_tracker_death(signal.SIGTERM, False)
5335
5336    def test_resource_tracker_sigkill(self):
5337        # Uncatchable signal.
5338        self.check_resource_tracker_death(signal.SIGKILL, True)
5339
5340    @staticmethod
5341    def _is_resource_tracker_reused(conn, pid):
5342        from multiprocessing.resource_tracker import _resource_tracker
5343        _resource_tracker.ensure_running()
5344        # The pid should be None in the child process, expect for the fork
5345        # context. It should not be a new value.
5346        reused = _resource_tracker._pid in (None, pid)
5347        reused &= _resource_tracker._check_alive()
5348        conn.send(reused)
5349
5350    def test_resource_tracker_reused(self):
5351        from multiprocessing.resource_tracker import _resource_tracker
5352        _resource_tracker.ensure_running()
5353        pid = _resource_tracker._pid
5354
5355        r, w = multiprocessing.Pipe(duplex=False)
5356        p = multiprocessing.Process(target=self._is_resource_tracker_reused,
5357                                    args=(w, pid))
5358        p.start()
5359        is_resource_tracker_reused = r.recv()
5360
5361        # Clean up
5362        p.join()
5363        w.close()
5364        r.close()
5365
5366        self.assertTrue(is_resource_tracker_reused)
5367
5368
5369class TestSimpleQueue(unittest.TestCase):
5370
5371    @classmethod
5372    def _test_empty(cls, queue, child_can_start, parent_can_continue):
5373        child_can_start.wait()
5374        # issue 30301, could fail under spawn and forkserver
5375        try:
5376            queue.put(queue.empty())
5377            queue.put(queue.empty())
5378        finally:
5379            parent_can_continue.set()
5380
5381    def test_empty(self):
5382        queue = multiprocessing.SimpleQueue()
5383        child_can_start = multiprocessing.Event()
5384        parent_can_continue = multiprocessing.Event()
5385
5386        proc = multiprocessing.Process(
5387            target=self._test_empty,
5388            args=(queue, child_can_start, parent_can_continue)
5389        )
5390        proc.daemon = True
5391        proc.start()
5392
5393        self.assertTrue(queue.empty())
5394
5395        child_can_start.set()
5396        parent_can_continue.wait()
5397
5398        self.assertFalse(queue.empty())
5399        self.assertEqual(queue.get(), True)
5400        self.assertEqual(queue.get(), False)
5401        self.assertTrue(queue.empty())
5402
5403        proc.join()
5404
5405    def test_close(self):
5406        queue = multiprocessing.SimpleQueue()
5407        queue.close()
5408        # closing a queue twice should not fail
5409        queue.close()
5410
5411    # Test specific to CPython since it tests private attributes
5412    @test.support.cpython_only
5413    def test_closed(self):
5414        queue = multiprocessing.SimpleQueue()
5415        queue.close()
5416        self.assertTrue(queue._reader.closed)
5417        self.assertTrue(queue._writer.closed)
5418
5419
5420class TestPoolNotLeakOnFailure(unittest.TestCase):
5421
5422    def test_release_unused_processes(self):
5423        # Issue #19675: During pool creation, if we can't create a process,
5424        # don't leak already created ones.
5425        will_fail_in = 3
5426        forked_processes = []
5427
5428        class FailingForkProcess:
5429            def __init__(self, **kwargs):
5430                self.name = 'Fake Process'
5431                self.exitcode = None
5432                self.state = None
5433                forked_processes.append(self)
5434
5435            def start(self):
5436                nonlocal will_fail_in
5437                if will_fail_in <= 0:
5438                    raise OSError("Manually induced OSError")
5439                will_fail_in -= 1
5440                self.state = 'started'
5441
5442            def terminate(self):
5443                self.state = 'stopping'
5444
5445            def join(self):
5446                if self.state == 'stopping':
5447                    self.state = 'stopped'
5448
5449            def is_alive(self):
5450                return self.state == 'started' or self.state == 'stopping'
5451
5452        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5453            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5454                Process=FailingForkProcess))
5455            p.close()
5456            p.join()
5457        self.assertFalse(
5458            any(process.is_alive() for process in forked_processes))
5459
5460
5461@hashlib_helper.requires_hashdigest('md5')
5462class TestSyncManagerTypes(unittest.TestCase):
5463    """Test all the types which can be shared between a parent and a
5464    child process by using a manager which acts as an intermediary
5465    between them.
5466
5467    In the following unit-tests the base type is created in the parent
5468    process, the @classmethod represents the worker process and the
5469    shared object is readable and editable between the two.
5470
5471    # The child.
5472    @classmethod
5473    def _test_list(cls, obj):
5474        assert obj[0] == 5
5475        assert obj.append(6)
5476
5477    # The parent.
5478    def test_list(self):
5479        o = self.manager.list()
5480        o.append(5)
5481        self.run_worker(self._test_list, o)
5482        assert o[1] == 6
5483    """
5484    manager_class = multiprocessing.managers.SyncManager
5485
5486    def setUp(self):
5487        self.manager = self.manager_class()
5488        self.manager.start()
5489        self.proc = None
5490
5491    def tearDown(self):
5492        if self.proc is not None and self.proc.is_alive():
5493            self.proc.terminate()
5494            self.proc.join()
5495        self.manager.shutdown()
5496        self.manager = None
5497        self.proc = None
5498
5499    @classmethod
5500    def setUpClass(cls):
5501        support.reap_children()
5502
5503    tearDownClass = setUpClass
5504
5505    def wait_proc_exit(self):
5506        # Only the manager process should be returned by active_children()
5507        # but this can take a bit on slow machines, so wait a few seconds
5508        # if there are other children too (see #17395).
5509        join_process(self.proc)
5510        start_time = time.monotonic()
5511        t = 0.01
5512        while len(multiprocessing.active_children()) > 1:
5513            time.sleep(t)
5514            t *= 2
5515            dt = time.monotonic() - start_time
5516            if dt >= 5.0:
5517                test.support.environment_altered = True
5518                support.print_warning(f"multiprocessing.Manager still has "
5519                                      f"{multiprocessing.active_children()} "
5520                                      f"active children after {dt} seconds")
5521                break
5522
5523    def run_worker(self, worker, obj):
5524        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5525        self.proc.daemon = True
5526        self.proc.start()
5527        self.wait_proc_exit()
5528        self.assertEqual(self.proc.exitcode, 0)
5529
5530    @classmethod
5531    def _test_event(cls, obj):
5532        assert obj.is_set()
5533        obj.wait()
5534        obj.clear()
5535        obj.wait(0.001)
5536
5537    def test_event(self):
5538        o = self.manager.Event()
5539        o.set()
5540        self.run_worker(self._test_event, o)
5541        assert not o.is_set()
5542        o.wait(0.001)
5543
5544    @classmethod
5545    def _test_lock(cls, obj):
5546        obj.acquire()
5547
5548    def test_lock(self, lname="Lock"):
5549        o = getattr(self.manager, lname)()
5550        self.run_worker(self._test_lock, o)
5551        o.release()
5552        self.assertRaises(RuntimeError, o.release)  # already released
5553
5554    @classmethod
5555    def _test_rlock(cls, obj):
5556        obj.acquire()
5557        obj.release()
5558
5559    def test_rlock(self, lname="Lock"):
5560        o = getattr(self.manager, lname)()
5561        self.run_worker(self._test_rlock, o)
5562
5563    @classmethod
5564    def _test_semaphore(cls, obj):
5565        obj.acquire()
5566
5567    def test_semaphore(self, sname="Semaphore"):
5568        o = getattr(self.manager, sname)()
5569        self.run_worker(self._test_semaphore, o)
5570        o.release()
5571
5572    def test_bounded_semaphore(self):
5573        self.test_semaphore(sname="BoundedSemaphore")
5574
5575    @classmethod
5576    def _test_condition(cls, obj):
5577        obj.acquire()
5578        obj.release()
5579
5580    def test_condition(self):
5581        o = self.manager.Condition()
5582        self.run_worker(self._test_condition, o)
5583
5584    @classmethod
5585    def _test_barrier(cls, obj):
5586        assert obj.parties == 5
5587        obj.reset()
5588
5589    def test_barrier(self):
5590        o = self.manager.Barrier(5)
5591        self.run_worker(self._test_barrier, o)
5592
5593    @classmethod
5594    def _test_pool(cls, obj):
5595        # TODO: fix https://bugs.python.org/issue35919
5596        with obj:
5597            pass
5598
5599    def test_pool(self):
5600        o = self.manager.Pool(processes=4)
5601        self.run_worker(self._test_pool, o)
5602
5603    @classmethod
5604    def _test_queue(cls, obj):
5605        assert obj.qsize() == 2
5606        assert obj.full()
5607        assert not obj.empty()
5608        assert obj.get() == 5
5609        assert not obj.empty()
5610        assert obj.get() == 6
5611        assert obj.empty()
5612
5613    def test_queue(self, qname="Queue"):
5614        o = getattr(self.manager, qname)(2)
5615        o.put(5)
5616        o.put(6)
5617        self.run_worker(self._test_queue, o)
5618        assert o.empty()
5619        assert not o.full()
5620
5621    def test_joinable_queue(self):
5622        self.test_queue("JoinableQueue")
5623
5624    @classmethod
5625    def _test_list(cls, obj):
5626        assert obj[0] == 5
5627        assert obj.count(5) == 1
5628        assert obj.index(5) == 0
5629        obj.sort()
5630        obj.reverse()
5631        for x in obj:
5632            pass
5633        assert len(obj) == 1
5634        assert obj.pop(0) == 5
5635
5636    def test_list(self):
5637        o = self.manager.list()
5638        o.append(5)
5639        self.run_worker(self._test_list, o)
5640        assert not o
5641        self.assertEqual(len(o), 0)
5642
5643    @classmethod
5644    def _test_dict(cls, obj):
5645        assert len(obj) == 1
5646        assert obj['foo'] == 5
5647        assert obj.get('foo') == 5
5648        assert list(obj.items()) == [('foo', 5)]
5649        assert list(obj.keys()) == ['foo']
5650        assert list(obj.values()) == [5]
5651        assert obj.copy() == {'foo': 5}
5652        assert obj.popitem() == ('foo', 5)
5653
5654    def test_dict(self):
5655        o = self.manager.dict()
5656        o['foo'] = 5
5657        self.run_worker(self._test_dict, o)
5658        assert not o
5659        self.assertEqual(len(o), 0)
5660
5661    @classmethod
5662    def _test_value(cls, obj):
5663        assert obj.value == 1
5664        assert obj.get() == 1
5665        obj.set(2)
5666
5667    def test_value(self):
5668        o = self.manager.Value('i', 1)
5669        self.run_worker(self._test_value, o)
5670        self.assertEqual(o.value, 2)
5671        self.assertEqual(o.get(), 2)
5672
5673    @classmethod
5674    def _test_array(cls, obj):
5675        assert obj[0] == 0
5676        assert obj[1] == 1
5677        assert len(obj) == 2
5678        assert list(obj) == [0, 1]
5679
5680    def test_array(self):
5681        o = self.manager.Array('i', [0, 1])
5682        self.run_worker(self._test_array, o)
5683
5684    @classmethod
5685    def _test_namespace(cls, obj):
5686        assert obj.x == 0
5687        assert obj.y == 1
5688
5689    def test_namespace(self):
5690        o = self.manager.Namespace()
5691        o.x = 0
5692        o.y = 1
5693        self.run_worker(self._test_namespace, o)
5694
5695
5696class MiscTestCase(unittest.TestCase):
5697    def test__all__(self):
5698        # Just make sure names in not_exported are excluded
5699        support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5700                             not_exported=['SUBDEBUG', 'SUBWARNING'])
5701
5702
5703#
5704# Mixins
5705#
5706
5707class BaseMixin(object):
5708    @classmethod
5709    def setUpClass(cls):
5710        cls.dangling = (multiprocessing.process._dangling.copy(),
5711                        threading._dangling.copy())
5712
5713    @classmethod
5714    def tearDownClass(cls):
5715        # bpo-26762: Some multiprocessing objects like Pool create reference
5716        # cycles. Trigger a garbage collection to break these cycles.
5717        test.support.gc_collect()
5718
5719        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5720        if processes:
5721            test.support.environment_altered = True
5722            support.print_warning(f'Dangling processes: {processes}')
5723        processes = None
5724
5725        threads = set(threading._dangling) - set(cls.dangling[1])
5726        if threads:
5727            test.support.environment_altered = True
5728            support.print_warning(f'Dangling threads: {threads}')
5729        threads = None
5730
5731
5732class ProcessesMixin(BaseMixin):
5733    TYPE = 'processes'
5734    Process = multiprocessing.Process
5735    connection = multiprocessing.connection
5736    current_process = staticmethod(multiprocessing.current_process)
5737    parent_process = staticmethod(multiprocessing.parent_process)
5738    active_children = staticmethod(multiprocessing.active_children)
5739    Pool = staticmethod(multiprocessing.Pool)
5740    Pipe = staticmethod(multiprocessing.Pipe)
5741    Queue = staticmethod(multiprocessing.Queue)
5742    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5743    Lock = staticmethod(multiprocessing.Lock)
5744    RLock = staticmethod(multiprocessing.RLock)
5745    Semaphore = staticmethod(multiprocessing.Semaphore)
5746    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5747    Condition = staticmethod(multiprocessing.Condition)
5748    Event = staticmethod(multiprocessing.Event)
5749    Barrier = staticmethod(multiprocessing.Barrier)
5750    Value = staticmethod(multiprocessing.Value)
5751    Array = staticmethod(multiprocessing.Array)
5752    RawValue = staticmethod(multiprocessing.RawValue)
5753    RawArray = staticmethod(multiprocessing.RawArray)
5754
5755
5756class ManagerMixin(BaseMixin):
5757    TYPE = 'manager'
5758    Process = multiprocessing.Process
5759    Queue = property(operator.attrgetter('manager.Queue'))
5760    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5761    Lock = property(operator.attrgetter('manager.Lock'))
5762    RLock = property(operator.attrgetter('manager.RLock'))
5763    Semaphore = property(operator.attrgetter('manager.Semaphore'))
5764    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5765    Condition = property(operator.attrgetter('manager.Condition'))
5766    Event = property(operator.attrgetter('manager.Event'))
5767    Barrier = property(operator.attrgetter('manager.Barrier'))
5768    Value = property(operator.attrgetter('manager.Value'))
5769    Array = property(operator.attrgetter('manager.Array'))
5770    list = property(operator.attrgetter('manager.list'))
5771    dict = property(operator.attrgetter('manager.dict'))
5772    Namespace = property(operator.attrgetter('manager.Namespace'))
5773
5774    @classmethod
5775    def Pool(cls, *args, **kwds):
5776        return cls.manager.Pool(*args, **kwds)
5777
5778    @classmethod
5779    def setUpClass(cls):
5780        super().setUpClass()
5781        cls.manager = multiprocessing.Manager()
5782
5783    @classmethod
5784    def tearDownClass(cls):
5785        # only the manager process should be returned by active_children()
5786        # but this can take a bit on slow machines, so wait a few seconds
5787        # if there are other children too (see #17395)
5788        start_time = time.monotonic()
5789        t = 0.01
5790        while len(multiprocessing.active_children()) > 1:
5791            time.sleep(t)
5792            t *= 2
5793            dt = time.monotonic() - start_time
5794            if dt >= 5.0:
5795                test.support.environment_altered = True
5796                support.print_warning(f"multiprocessing.Manager still has "
5797                                      f"{multiprocessing.active_children()} "
5798                                      f"active children after {dt} seconds")
5799                break
5800
5801        gc.collect()                       # do garbage collection
5802        if cls.manager._number_of_objects() != 0:
5803            # This is not really an error since some tests do not
5804            # ensure that all processes which hold a reference to a
5805            # managed object have been joined.
5806            test.support.environment_altered = True
5807            support.print_warning('Shared objects which still exist '
5808                                  'at manager shutdown:')
5809            support.print_warning(cls.manager._debug_info())
5810        cls.manager.shutdown()
5811        cls.manager.join()
5812        cls.manager = None
5813
5814        super().tearDownClass()
5815
5816
5817class ThreadsMixin(BaseMixin):
5818    TYPE = 'threads'
5819    Process = multiprocessing.dummy.Process
5820    connection = multiprocessing.dummy.connection
5821    current_process = staticmethod(multiprocessing.dummy.current_process)
5822    active_children = staticmethod(multiprocessing.dummy.active_children)
5823    Pool = staticmethod(multiprocessing.dummy.Pool)
5824    Pipe = staticmethod(multiprocessing.dummy.Pipe)
5825    Queue = staticmethod(multiprocessing.dummy.Queue)
5826    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5827    Lock = staticmethod(multiprocessing.dummy.Lock)
5828    RLock = staticmethod(multiprocessing.dummy.RLock)
5829    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5830    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5831    Condition = staticmethod(multiprocessing.dummy.Condition)
5832    Event = staticmethod(multiprocessing.dummy.Event)
5833    Barrier = staticmethod(multiprocessing.dummy.Barrier)
5834    Value = staticmethod(multiprocessing.dummy.Value)
5835    Array = staticmethod(multiprocessing.dummy.Array)
5836
5837#
5838# Functions used to create test cases from the base ones in this module
5839#
5840
5841def install_tests_in_module_dict(remote_globs, start_method):
5842    __module__ = remote_globs['__name__']
5843    local_globs = globals()
5844    ALL_TYPES = {'processes', 'threads', 'manager'}
5845
5846    for name, base in local_globs.items():
5847        if not isinstance(base, type):
5848            continue
5849        if issubclass(base, BaseTestCase):
5850            if base is BaseTestCase:
5851                continue
5852            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5853            for type_ in base.ALLOWED_TYPES:
5854                newname = 'With' + type_.capitalize() + name[1:]
5855                Mixin = local_globs[type_.capitalize() + 'Mixin']
5856                class Temp(base, Mixin, unittest.TestCase):
5857                    pass
5858                if type_ == 'manager':
5859                    Temp = hashlib_helper.requires_hashdigest('md5')(Temp)
5860                Temp.__name__ = Temp.__qualname__ = newname
5861                Temp.__module__ = __module__
5862                remote_globs[newname] = Temp
5863        elif issubclass(base, unittest.TestCase):
5864            class Temp(base, object):
5865                pass
5866            Temp.__name__ = Temp.__qualname__ = name
5867            Temp.__module__ = __module__
5868            remote_globs[name] = Temp
5869
5870    dangling = [None, None]
5871    old_start_method = [None]
5872
5873    def setUpModule():
5874        multiprocessing.set_forkserver_preload(PRELOAD)
5875        multiprocessing.process._cleanup()
5876        dangling[0] = multiprocessing.process._dangling.copy()
5877        dangling[1] = threading._dangling.copy()
5878        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
5879        try:
5880            multiprocessing.set_start_method(start_method, force=True)
5881        except ValueError:
5882            raise unittest.SkipTest(start_method +
5883                                    ' start method not supported')
5884
5885        if sys.platform.startswith("linux"):
5886            try:
5887                lock = multiprocessing.RLock()
5888            except OSError:
5889                raise unittest.SkipTest("OSError raises on RLock creation, "
5890                                        "see issue 3111!")
5891        check_enough_semaphores()
5892        util.get_temp_dir()     # creates temp directory
5893        multiprocessing.get_logger().setLevel(LOG_LEVEL)
5894
5895    def tearDownModule():
5896        need_sleep = False
5897
5898        # bpo-26762: Some multiprocessing objects like Pool create reference
5899        # cycles. Trigger a garbage collection to break these cycles.
5900        test.support.gc_collect()
5901
5902        multiprocessing.set_start_method(old_start_method[0], force=True)
5903        # pause a bit so we don't get warning about dangling threads/processes
5904        processes = set(multiprocessing.process._dangling) - set(dangling[0])
5905        if processes:
5906            need_sleep = True
5907            test.support.environment_altered = True
5908            support.print_warning(f'Dangling processes: {processes}')
5909        processes = None
5910
5911        threads = set(threading._dangling) - set(dangling[1])
5912        if threads:
5913            need_sleep = True
5914            test.support.environment_altered = True
5915            support.print_warning(f'Dangling threads: {threads}')
5916        threads = None
5917
5918        # Sleep 500 ms to give time to child processes to complete.
5919        if need_sleep:
5920            time.sleep(0.5)
5921
5922        multiprocessing.util._cleanup_tests()
5923
5924    remote_globs['setUpModule'] = setUpModule
5925    remote_globs['tearDownModule'] = tearDownModule
5926