• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import unittest.mock
7import queue as pyqueue
8import textwrap
9import time
10import io
11import itertools
12import sys
13import os
14import gc
15import importlib
16import errno
17import functools
18import signal
19import array
20import socket
21import random
22import logging
23import shutil
24import subprocess
25import struct
26import tempfile
27import operator
28import pickle
29import weakref
30import warnings
31import test.support
32import test.support.script_helper
33from test import support
34from test.support import hashlib_helper
35from test.support import import_helper
36from test.support import os_helper
37from test.support import script_helper
38from test.support import socket_helper
39from test.support import threading_helper
40from test.support import warnings_helper
41
42
43# Skip tests if _multiprocessing wasn't built.
44_multiprocessing = import_helper.import_module('_multiprocessing')
45# Skip tests if sem_open implementation is broken.
46support.skip_if_broken_multiprocessing_synchronize()
47import threading
48
49import multiprocessing.connection
50import multiprocessing.dummy
51import multiprocessing.heap
52import multiprocessing.managers
53import multiprocessing.pool
54import multiprocessing.queues
55from multiprocessing.connection import wait
56
57from multiprocessing import util
58
59try:
60    from multiprocessing import reduction
61    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
62except ImportError:
63    HAS_REDUCTION = False
64
65try:
66    from multiprocessing.sharedctypes import Value, copy
67    HAS_SHAREDCTYPES = True
68except ImportError:
69    HAS_SHAREDCTYPES = False
70
71try:
72    from multiprocessing import shared_memory
73    HAS_SHMEM = True
74except ImportError:
75    HAS_SHMEM = False
76
77try:
78    import msvcrt
79except ImportError:
80    msvcrt = None
81
82
83if support.HAVE_ASAN_FORK_BUG:
84    # gh-89363: Skip multiprocessing tests if Python is built with ASAN to
85    # work around a libasan race condition: dead lock in pthread_create().
86    raise unittest.SkipTest("libasan has a pthread_create() dead lock related to thread+fork")
87
88
89# gh-110666: Tolerate a difference of 100 ms when comparing timings
90# (clock resolution)
91CLOCK_RES = 0.100
92
93
94def latin(s):
95    return s.encode('latin')
96
97
98def close_queue(queue):
99    if isinstance(queue, multiprocessing.queues.Queue):
100        queue.close()
101        queue.join_thread()
102
103
104def join_process(process):
105    # Since multiprocessing.Process has the same API than threading.Thread
106    # (join() and is_alive(), the support function can be reused
107    threading_helper.join_thread(process)
108
109
110if os.name == "posix":
111    from multiprocessing import resource_tracker
112
113    def _resource_unlink(name, rtype):
114        resource_tracker._CLEANUP_FUNCS[rtype](name)
115
116
117#
118# Constants
119#
120
121LOG_LEVEL = util.SUBWARNING
122#LOG_LEVEL = logging.DEBUG
123
124DELTA = 0.1
125CHECK_TIMINGS = False     # making true makes tests take a lot longer
126                          # and can sometimes cause some non-serious
127                          # failures because some calls block a bit
128                          # longer than expected
129if CHECK_TIMINGS:
130    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
131else:
132    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
133
134# BaseManager.shutdown_timeout
135SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT
136
137WAIT_ACTIVE_CHILDREN_TIMEOUT = 5.0
138
139HAVE_GETVALUE = not getattr(_multiprocessing,
140                            'HAVE_BROKEN_SEM_GETVALUE', False)
141
142WIN32 = (sys.platform == "win32")
143
144def wait_for_handle(handle, timeout):
145    if timeout is not None and timeout < 0.0:
146        timeout = None
147    return wait([handle], timeout)
148
149try:
150    MAXFD = os.sysconf("SC_OPEN_MAX")
151except:
152    MAXFD = 256
153
154# To speed up tests when using the forkserver, we can preload these:
155PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
156
157#
158# Some tests require ctypes
159#
160
161try:
162    from ctypes import Structure, c_int, c_double, c_longlong
163except ImportError:
164    Structure = object
165    c_int = c_double = c_longlong = None
166
167
168def check_enough_semaphores():
169    """Check that the system supports enough semaphores to run the test."""
170    # minimum number of semaphores available according to POSIX
171    nsems_min = 256
172    try:
173        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
174    except (AttributeError, ValueError):
175        # sysconf not available or setting not available
176        return
177    if nsems == -1 or nsems >= nsems_min:
178        return
179    raise unittest.SkipTest("The OS doesn't support enough semaphores "
180                            "to run the test (required: %d)." % nsems_min)
181
182
183def only_run_in_spawn_testsuite(reason):
184    """Returns a decorator: raises SkipTest when SM != spawn at test time.
185
186    This can be useful to save overall Python test suite execution time.
187    "spawn" is the universal mode available on all platforms so this limits the
188    decorated test to only execute within test_multiprocessing_spawn.
189
190    This would not be necessary if we refactored our test suite to split things
191    into other test files when they are not start method specific to be rerun
192    under all start methods.
193    """
194
195    def decorator(test_item):
196
197        @functools.wraps(test_item)
198        def spawn_check_wrapper(*args, **kwargs):
199            if (start_method := multiprocessing.get_start_method()) != "spawn":
200                raise unittest.SkipTest(f"{start_method=}, not 'spawn'; {reason}")
201            return test_item(*args, **kwargs)
202
203        return spawn_check_wrapper
204
205    return decorator
206
207
208class TestInternalDecorators(unittest.TestCase):
209    """Logic within a test suite that could errantly skip tests? Test it!"""
210
211    @unittest.skipIf(sys.platform == "win32", "test requires that fork exists.")
212    def test_only_run_in_spawn_testsuite(self):
213        if multiprocessing.get_start_method() != "spawn":
214            raise unittest.SkipTest("only run in test_multiprocessing_spawn.")
215
216        try:
217            @only_run_in_spawn_testsuite("testing this decorator")
218            def return_four_if_spawn():
219                return 4
220        except Exception as err:
221            self.fail(f"expected decorated `def` not to raise; caught {err}")
222
223        orig_start_method = multiprocessing.get_start_method(allow_none=True)
224        try:
225            multiprocessing.set_start_method("spawn", force=True)
226            self.assertEqual(return_four_if_spawn(), 4)
227            multiprocessing.set_start_method("fork", force=True)
228            with self.assertRaises(unittest.SkipTest) as ctx:
229                return_four_if_spawn()
230            self.assertIn("testing this decorator", str(ctx.exception))
231            self.assertIn("start_method=", str(ctx.exception))
232        finally:
233            multiprocessing.set_start_method(orig_start_method, force=True)
234
235
236#
237# Creates a wrapper for a function which records the time it takes to finish
238#
239
240class TimingWrapper(object):
241
242    def __init__(self, func):
243        self.func = func
244        self.elapsed = None
245
246    def __call__(self, *args, **kwds):
247        t = time.monotonic()
248        try:
249            return self.func(*args, **kwds)
250        finally:
251            self.elapsed = time.monotonic() - t
252
253#
254# Base class for test cases
255#
256
257class BaseTestCase(object):
258
259    ALLOWED_TYPES = ('processes', 'manager', 'threads')
260    # If not empty, limit which start method suites run this class.
261    START_METHODS: set[str] = set()
262    start_method = None  # set by install_tests_in_module_dict()
263
264    def assertTimingAlmostEqual(self, a, b):
265        if CHECK_TIMINGS:
266            self.assertAlmostEqual(a, b, 1)
267
268    def assertReturnsIfImplemented(self, value, func, *args):
269        try:
270            res = func(*args)
271        except NotImplementedError:
272            pass
273        else:
274            return self.assertEqual(value, res)
275
276    # For the sanity of Windows users, rather than crashing or freezing in
277    # multiple ways.
278    def __reduce__(self, *args):
279        raise NotImplementedError("shouldn't try to pickle a test case")
280
281    __reduce_ex__ = __reduce__
282
283#
284# Return the value of a semaphore
285#
286
287def get_value(self):
288    try:
289        return self.get_value()
290    except AttributeError:
291        try:
292            return self._Semaphore__value
293        except AttributeError:
294            try:
295                return self._value
296            except AttributeError:
297                raise NotImplementedError
298
299#
300# Testcases
301#
302
303class DummyCallable:
304    def __call__(self, q, c):
305        assert isinstance(c, DummyCallable)
306        q.put(5)
307
308
309class _TestProcess(BaseTestCase):
310
311    ALLOWED_TYPES = ('processes', 'threads')
312
313    def test_current(self):
314        if self.TYPE == 'threads':
315            self.skipTest('test not appropriate for {}'.format(self.TYPE))
316
317        current = self.current_process()
318        authkey = current.authkey
319
320        self.assertTrue(current.is_alive())
321        self.assertTrue(not current.daemon)
322        self.assertIsInstance(authkey, bytes)
323        self.assertTrue(len(authkey) > 0)
324        self.assertEqual(current.ident, os.getpid())
325        self.assertEqual(current.exitcode, None)
326
327    def test_set_executable(self):
328        if self.TYPE == 'threads':
329            self.skipTest(f'test not appropriate for {self.TYPE}')
330        paths = [
331            sys.executable,               # str
332            os.fsencode(sys.executable),  # bytes
333            os_helper.FakePath(sys.executable),  # os.PathLike
334            os_helper.FakePath(os.fsencode(sys.executable)),  # os.PathLike bytes
335        ]
336        for path in paths:
337            self.set_executable(path)
338            p = self.Process()
339            p.start()
340            p.join()
341            self.assertEqual(p.exitcode, 0)
342
343    @support.requires_resource('cpu')
344    def test_args_argument(self):
345        # bpo-45735: Using list or tuple as *args* in constructor could
346        # achieve the same effect.
347        args_cases = (1, "str", [1], (1,))
348        args_types = (list, tuple)
349
350        test_cases = itertools.product(args_cases, args_types)
351
352        for args, args_type in test_cases:
353            with self.subTest(args=args, args_type=args_type):
354                q = self.Queue(1)
355                # pass a tuple or list as args
356                p = self.Process(target=self._test_args, args=args_type((q, args)))
357                p.daemon = True
358                p.start()
359                child_args = q.get()
360                self.assertEqual(child_args, args)
361                p.join()
362                close_queue(q)
363
364    @classmethod
365    def _test_args(cls, q, arg):
366        q.put(arg)
367
368    def test_daemon_argument(self):
369        if self.TYPE == "threads":
370            self.skipTest('test not appropriate for {}'.format(self.TYPE))
371
372        # By default uses the current process's daemon flag.
373        proc0 = self.Process(target=self._test)
374        self.assertEqual(proc0.daemon, self.current_process().daemon)
375        proc1 = self.Process(target=self._test, daemon=True)
376        self.assertTrue(proc1.daemon)
377        proc2 = self.Process(target=self._test, daemon=False)
378        self.assertFalse(proc2.daemon)
379
380    @classmethod
381    def _test(cls, q, *args, **kwds):
382        current = cls.current_process()
383        q.put(args)
384        q.put(kwds)
385        q.put(current.name)
386        if cls.TYPE != 'threads':
387            q.put(bytes(current.authkey))
388            q.put(current.pid)
389
390    def test_parent_process_attributes(self):
391        if self.TYPE == "threads":
392            self.skipTest('test not appropriate for {}'.format(self.TYPE))
393
394        self.assertIsNone(self.parent_process())
395
396        rconn, wconn = self.Pipe(duplex=False)
397        p = self.Process(target=self._test_send_parent_process, args=(wconn,))
398        p.start()
399        p.join()
400        parent_pid, parent_name = rconn.recv()
401        self.assertEqual(parent_pid, self.current_process().pid)
402        self.assertEqual(parent_pid, os.getpid())
403        self.assertEqual(parent_name, self.current_process().name)
404
405    @classmethod
406    def _test_send_parent_process(cls, wconn):
407        from multiprocessing.process import parent_process
408        wconn.send([parent_process().pid, parent_process().name])
409
410    def test_parent_process(self):
411        if self.TYPE == "threads":
412            self.skipTest('test not appropriate for {}'.format(self.TYPE))
413
414        # Launch a child process. Make it launch a grandchild process. Kill the
415        # child process and make sure that the grandchild notices the death of
416        # its parent (a.k.a the child process).
417        rconn, wconn = self.Pipe(duplex=False)
418        p = self.Process(
419            target=self._test_create_grandchild_process, args=(wconn, ))
420        p.start()
421
422        if not rconn.poll(timeout=support.LONG_TIMEOUT):
423            raise AssertionError("Could not communicate with child process")
424        parent_process_status = rconn.recv()
425        self.assertEqual(parent_process_status, "alive")
426
427        p.terminate()
428        p.join()
429
430        if not rconn.poll(timeout=support.LONG_TIMEOUT):
431            raise AssertionError("Could not communicate with child process")
432        parent_process_status = rconn.recv()
433        self.assertEqual(parent_process_status, "not alive")
434
435    @classmethod
436    def _test_create_grandchild_process(cls, wconn):
437        p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
438        p.start()
439        time.sleep(300)
440
441    @classmethod
442    def _test_report_parent_status(cls, wconn):
443        from multiprocessing.process import parent_process
444        wconn.send("alive" if parent_process().is_alive() else "not alive")
445        parent_process().join(timeout=support.SHORT_TIMEOUT)
446        wconn.send("alive" if parent_process().is_alive() else "not alive")
447
448    def test_process(self):
449        q = self.Queue(1)
450        e = self.Event()
451        args = (q, 1, 2)
452        kwargs = {'hello':23, 'bye':2.54}
453        name = 'SomeProcess'
454        p = self.Process(
455            target=self._test, args=args, kwargs=kwargs, name=name
456            )
457        p.daemon = True
458        current = self.current_process()
459
460        if self.TYPE != 'threads':
461            self.assertEqual(p.authkey, current.authkey)
462        self.assertEqual(p.is_alive(), False)
463        self.assertEqual(p.daemon, True)
464        self.assertNotIn(p, self.active_children())
465        self.assertTrue(type(self.active_children()) is list)
466        self.assertEqual(p.exitcode, None)
467
468        p.start()
469
470        self.assertEqual(p.exitcode, None)
471        self.assertEqual(p.is_alive(), True)
472        self.assertIn(p, self.active_children())
473
474        self.assertEqual(q.get(), args[1:])
475        self.assertEqual(q.get(), kwargs)
476        self.assertEqual(q.get(), p.name)
477        if self.TYPE != 'threads':
478            self.assertEqual(q.get(), current.authkey)
479            self.assertEqual(q.get(), p.pid)
480
481        p.join()
482
483        self.assertEqual(p.exitcode, 0)
484        self.assertEqual(p.is_alive(), False)
485        self.assertNotIn(p, self.active_children())
486        close_queue(q)
487
488    @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
489    def test_process_mainthread_native_id(self):
490        if self.TYPE == 'threads':
491            self.skipTest('test not appropriate for {}'.format(self.TYPE))
492
493        current_mainthread_native_id = threading.main_thread().native_id
494
495        q = self.Queue(1)
496        p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
497        p.start()
498
499        child_mainthread_native_id = q.get()
500        p.join()
501        close_queue(q)
502
503        self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
504
505    @classmethod
506    def _test_process_mainthread_native_id(cls, q):
507        mainthread_native_id = threading.main_thread().native_id
508        q.put(mainthread_native_id)
509
510    @classmethod
511    def _sleep_some(cls):
512        time.sleep(100)
513
514    @classmethod
515    def _test_sleep(cls, delay):
516        time.sleep(delay)
517
518    def _kill_process(self, meth):
519        if self.TYPE == 'threads':
520            self.skipTest('test not appropriate for {}'.format(self.TYPE))
521
522        p = self.Process(target=self._sleep_some)
523        p.daemon = True
524        p.start()
525
526        self.assertEqual(p.is_alive(), True)
527        self.assertIn(p, self.active_children())
528        self.assertEqual(p.exitcode, None)
529
530        join = TimingWrapper(p.join)
531
532        self.assertEqual(join(0), None)
533        self.assertTimingAlmostEqual(join.elapsed, 0.0)
534        self.assertEqual(p.is_alive(), True)
535
536        self.assertEqual(join(-1), None)
537        self.assertTimingAlmostEqual(join.elapsed, 0.0)
538        self.assertEqual(p.is_alive(), True)
539
540        # XXX maybe terminating too soon causes the problems on Gentoo...
541        time.sleep(1)
542
543        meth(p)
544
545        if hasattr(signal, 'alarm'):
546            # On the Gentoo buildbot waitpid() often seems to block forever.
547            # We use alarm() to interrupt it if it blocks for too long.
548            def handler(*args):
549                raise RuntimeError('join took too long: %s' % p)
550            old_handler = signal.signal(signal.SIGALRM, handler)
551            try:
552                signal.alarm(10)
553                self.assertEqual(join(), None)
554            finally:
555                signal.alarm(0)
556                signal.signal(signal.SIGALRM, old_handler)
557        else:
558            self.assertEqual(join(), None)
559
560        self.assertTimingAlmostEqual(join.elapsed, 0.0)
561
562        self.assertEqual(p.is_alive(), False)
563        self.assertNotIn(p, self.active_children())
564
565        p.join()
566
567        return p.exitcode
568
569    def test_terminate(self):
570        exitcode = self._kill_process(multiprocessing.Process.terminate)
571        self.assertEqual(exitcode, -signal.SIGTERM)
572
573    def test_kill(self):
574        exitcode = self._kill_process(multiprocessing.Process.kill)
575        if os.name != 'nt':
576            self.assertEqual(exitcode, -signal.SIGKILL)
577        else:
578            self.assertEqual(exitcode, -signal.SIGTERM)
579
580    def test_cpu_count(self):
581        try:
582            cpus = multiprocessing.cpu_count()
583        except NotImplementedError:
584            cpus = 1
585        self.assertTrue(type(cpus) is int)
586        self.assertTrue(cpus >= 1)
587
588    def test_active_children(self):
589        self.assertEqual(type(self.active_children()), list)
590
591        p = self.Process(target=time.sleep, args=(DELTA,))
592        self.assertNotIn(p, self.active_children())
593
594        p.daemon = True
595        p.start()
596        self.assertIn(p, self.active_children())
597
598        p.join()
599        self.assertNotIn(p, self.active_children())
600
601    @classmethod
602    def _test_recursion(cls, wconn, id):
603        wconn.send(id)
604        if len(id) < 2:
605            for i in range(2):
606                p = cls.Process(
607                    target=cls._test_recursion, args=(wconn, id+[i])
608                    )
609                p.start()
610                p.join()
611
612    def test_recursion(self):
613        rconn, wconn = self.Pipe(duplex=False)
614        self._test_recursion(wconn, [])
615
616        time.sleep(DELTA)
617        result = []
618        while rconn.poll():
619            result.append(rconn.recv())
620
621        expected = [
622            [],
623              [0],
624                [0, 0],
625                [0, 1],
626              [1],
627                [1, 0],
628                [1, 1]
629            ]
630        self.assertEqual(result, expected)
631
632    @classmethod
633    def _test_sentinel(cls, event):
634        event.wait(10.0)
635
636    def test_sentinel(self):
637        if self.TYPE == "threads":
638            self.skipTest('test not appropriate for {}'.format(self.TYPE))
639        event = self.Event()
640        p = self.Process(target=self._test_sentinel, args=(event,))
641        with self.assertRaises(ValueError):
642            p.sentinel
643        p.start()
644        self.addCleanup(p.join)
645        sentinel = p.sentinel
646        self.assertIsInstance(sentinel, int)
647        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
648        event.set()
649        p.join()
650        self.assertTrue(wait_for_handle(sentinel, timeout=1))
651
652    @classmethod
653    def _test_close(cls, rc=0, q=None):
654        if q is not None:
655            q.get()
656        sys.exit(rc)
657
658    def test_close(self):
659        if self.TYPE == "threads":
660            self.skipTest('test not appropriate for {}'.format(self.TYPE))
661        q = self.Queue()
662        p = self.Process(target=self._test_close, kwargs={'q': q})
663        p.daemon = True
664        p.start()
665        self.assertEqual(p.is_alive(), True)
666        # Child is still alive, cannot close
667        with self.assertRaises(ValueError):
668            p.close()
669
670        q.put(None)
671        p.join()
672        self.assertEqual(p.is_alive(), False)
673        self.assertEqual(p.exitcode, 0)
674        p.close()
675        with self.assertRaises(ValueError):
676            p.is_alive()
677        with self.assertRaises(ValueError):
678            p.join()
679        with self.assertRaises(ValueError):
680            p.terminate()
681        p.close()
682
683        wr = weakref.ref(p)
684        del p
685        gc.collect()
686        self.assertIs(wr(), None)
687
688        close_queue(q)
689
690    @support.requires_resource('walltime')
691    def test_many_processes(self):
692        if self.TYPE == 'threads':
693            self.skipTest('test not appropriate for {}'.format(self.TYPE))
694
695        sm = multiprocessing.get_start_method()
696        N = 5 if sm == 'spawn' else 100
697
698        # Try to overwhelm the forkserver loop with events
699        procs = [self.Process(target=self._test_sleep, args=(0.01,))
700                 for i in range(N)]
701        for p in procs:
702            p.start()
703        for p in procs:
704            join_process(p)
705        for p in procs:
706            self.assertEqual(p.exitcode, 0)
707
708        procs = [self.Process(target=self._sleep_some)
709                 for i in range(N)]
710        for p in procs:
711            p.start()
712        time.sleep(0.001)  # let the children start...
713        for p in procs:
714            p.terminate()
715        for p in procs:
716            join_process(p)
717        if os.name != 'nt':
718            exitcodes = [-signal.SIGTERM]
719            if sys.platform == 'darwin':
720                # bpo-31510: On macOS, killing a freshly started process with
721                # SIGTERM sometimes kills the process with SIGKILL.
722                exitcodes.append(-signal.SIGKILL)
723            for p in procs:
724                self.assertIn(p.exitcode, exitcodes)
725
726    def test_lose_target_ref(self):
727        c = DummyCallable()
728        wr = weakref.ref(c)
729        q = self.Queue()
730        p = self.Process(target=c, args=(q, c))
731        del c
732        p.start()
733        p.join()
734        gc.collect()  # For PyPy or other GCs.
735        self.assertIs(wr(), None)
736        self.assertEqual(q.get(), 5)
737        close_queue(q)
738
739    @classmethod
740    def _test_child_fd_inflation(self, evt, q):
741        q.put(os_helper.fd_count())
742        evt.wait()
743
744    def test_child_fd_inflation(self):
745        # Number of fds in child processes should not grow with the
746        # number of running children.
747        if self.TYPE == 'threads':
748            self.skipTest('test not appropriate for {}'.format(self.TYPE))
749
750        sm = multiprocessing.get_start_method()
751        if sm == 'fork':
752            # The fork method by design inherits all fds from the parent,
753            # trying to go against it is a lost battle
754            self.skipTest('test not appropriate for {}'.format(sm))
755
756        N = 5
757        evt = self.Event()
758        q = self.Queue()
759
760        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
761                 for i in range(N)]
762        for p in procs:
763            p.start()
764
765        try:
766            fd_counts = [q.get() for i in range(N)]
767            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
768
769        finally:
770            evt.set()
771            for p in procs:
772                p.join()
773            close_queue(q)
774
775    @classmethod
776    def _test_wait_for_threads(self, evt):
777        def func1():
778            time.sleep(0.5)
779            evt.set()
780
781        def func2():
782            time.sleep(20)
783            evt.clear()
784
785        threading.Thread(target=func1).start()
786        threading.Thread(target=func2, daemon=True).start()
787
788    def test_wait_for_threads(self):
789        # A child process should wait for non-daemonic threads to end
790        # before exiting
791        if self.TYPE == 'threads':
792            self.skipTest('test not appropriate for {}'.format(self.TYPE))
793
794        evt = self.Event()
795        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
796        proc.start()
797        proc.join()
798        self.assertTrue(evt.is_set())
799
800    @classmethod
801    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
802        for stream_name, action in break_std_streams.items():
803            if action == 'close':
804                stream = io.StringIO()
805                stream.close()
806            else:
807                assert action == 'remove'
808                stream = None
809            setattr(sys, stream_name, None)
810        evt.set()
811
812    def test_error_on_stdio_flush_1(self):
813        # Check that Process works with broken standard streams
814        streams = [io.StringIO(), None]
815        streams[0].close()
816        for stream_name in ('stdout', 'stderr'):
817            for stream in streams:
818                old_stream = getattr(sys, stream_name)
819                setattr(sys, stream_name, stream)
820                try:
821                    evt = self.Event()
822                    proc = self.Process(target=self._test_error_on_stdio_flush,
823                                        args=(evt,))
824                    proc.start()
825                    proc.join()
826                    self.assertTrue(evt.is_set())
827                    self.assertEqual(proc.exitcode, 0)
828                finally:
829                    setattr(sys, stream_name, old_stream)
830
831    def test_error_on_stdio_flush_2(self):
832        # Same as test_error_on_stdio_flush_1(), but standard streams are
833        # broken by the child process
834        for stream_name in ('stdout', 'stderr'):
835            for action in ('close', 'remove'):
836                old_stream = getattr(sys, stream_name)
837                try:
838                    evt = self.Event()
839                    proc = self.Process(target=self._test_error_on_stdio_flush,
840                                        args=(evt, {stream_name: action}))
841                    proc.start()
842                    proc.join()
843                    self.assertTrue(evt.is_set())
844                    self.assertEqual(proc.exitcode, 0)
845                finally:
846                    setattr(sys, stream_name, old_stream)
847
848    @classmethod
849    def _sleep_and_set_event(self, evt, delay=0.0):
850        time.sleep(delay)
851        evt.set()
852
853    def check_forkserver_death(self, signum):
854        # bpo-31308: if the forkserver process has died, we should still
855        # be able to create and run new Process instances (the forkserver
856        # is implicitly restarted).
857        if self.TYPE == 'threads':
858            self.skipTest('test not appropriate for {}'.format(self.TYPE))
859        sm = multiprocessing.get_start_method()
860        if sm != 'forkserver':
861            # The fork method by design inherits all fds from the parent,
862            # trying to go against it is a lost battle
863            self.skipTest('test not appropriate for {}'.format(sm))
864
865        from multiprocessing.forkserver import _forkserver
866        _forkserver.ensure_running()
867
868        # First process sleeps 500 ms
869        delay = 0.5
870
871        evt = self.Event()
872        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
873        proc.start()
874
875        pid = _forkserver._forkserver_pid
876        os.kill(pid, signum)
877        # give time to the fork server to die and time to proc to complete
878        time.sleep(delay * 2.0)
879
880        evt2 = self.Event()
881        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
882        proc2.start()
883        proc2.join()
884        self.assertTrue(evt2.is_set())
885        self.assertEqual(proc2.exitcode, 0)
886
887        proc.join()
888        self.assertTrue(evt.is_set())
889        self.assertIn(proc.exitcode, (0, 255))
890
891    def test_forkserver_sigint(self):
892        # Catchable signal
893        self.check_forkserver_death(signal.SIGINT)
894
895    def test_forkserver_sigkill(self):
896        # Uncatchable signal
897        if os.name != 'nt':
898            self.check_forkserver_death(signal.SIGKILL)
899
900
901#
902#
903#
904
905class _UpperCaser(multiprocessing.Process):
906
907    def __init__(self):
908        multiprocessing.Process.__init__(self)
909        self.child_conn, self.parent_conn = multiprocessing.Pipe()
910
911    def run(self):
912        self.parent_conn.close()
913        for s in iter(self.child_conn.recv, None):
914            self.child_conn.send(s.upper())
915        self.child_conn.close()
916
917    def submit(self, s):
918        assert type(s) is str
919        self.parent_conn.send(s)
920        return self.parent_conn.recv()
921
922    def stop(self):
923        self.parent_conn.send(None)
924        self.parent_conn.close()
925        self.child_conn.close()
926
927class _TestSubclassingProcess(BaseTestCase):
928
929    ALLOWED_TYPES = ('processes',)
930
931    def test_subclassing(self):
932        uppercaser = _UpperCaser()
933        uppercaser.daemon = True
934        uppercaser.start()
935        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
936        self.assertEqual(uppercaser.submit('world'), 'WORLD')
937        uppercaser.stop()
938        uppercaser.join()
939
940    def test_stderr_flush(self):
941        # sys.stderr is flushed at process shutdown (issue #13812)
942        if self.TYPE == "threads":
943            self.skipTest('test not appropriate for {}'.format(self.TYPE))
944
945        testfn = os_helper.TESTFN
946        self.addCleanup(os_helper.unlink, testfn)
947        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
948        proc.start()
949        proc.join()
950        with open(testfn, encoding="utf-8") as f:
951            err = f.read()
952            # The whole traceback was printed
953            self.assertIn("ZeroDivisionError", err)
954            self.assertIn("test_multiprocessing.py", err)
955            self.assertIn("1/0 # MARKER", err)
956
957    @classmethod
958    def _test_stderr_flush(cls, testfn):
959        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
960        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
961        1/0 # MARKER
962
963
964    @classmethod
965    def _test_sys_exit(cls, reason, testfn):
966        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
967        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
968        sys.exit(reason)
969
970    def test_sys_exit(self):
971        # See Issue 13854
972        if self.TYPE == 'threads':
973            self.skipTest('test not appropriate for {}'.format(self.TYPE))
974
975        testfn = os_helper.TESTFN
976        self.addCleanup(os_helper.unlink, testfn)
977
978        for reason in (
979            [1, 2, 3],
980            'ignore this',
981        ):
982            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
983            p.daemon = True
984            p.start()
985            join_process(p)
986            self.assertEqual(p.exitcode, 1)
987
988            with open(testfn, encoding="utf-8") as f:
989                content = f.read()
990            self.assertEqual(content.rstrip(), str(reason))
991
992            os.unlink(testfn)
993
994        cases = [
995            ((True,), 1),
996            ((False,), 0),
997            ((8,), 8),
998            ((None,), 0),
999            ((), 0),
1000            ]
1001
1002        for args, expected in cases:
1003            with self.subTest(args=args):
1004                p = self.Process(target=sys.exit, args=args)
1005                p.daemon = True
1006                p.start()
1007                join_process(p)
1008                self.assertEqual(p.exitcode, expected)
1009
1010#
1011#
1012#
1013
1014def queue_empty(q):
1015    if hasattr(q, 'empty'):
1016        return q.empty()
1017    else:
1018        return q.qsize() == 0
1019
1020def queue_full(q, maxsize):
1021    if hasattr(q, 'full'):
1022        return q.full()
1023    else:
1024        return q.qsize() == maxsize
1025
1026
1027class _TestQueue(BaseTestCase):
1028
1029
1030    @classmethod
1031    def _test_put(cls, queue, child_can_start, parent_can_continue):
1032        child_can_start.wait()
1033        for i in range(6):
1034            queue.get()
1035        parent_can_continue.set()
1036
1037    def test_put(self):
1038        MAXSIZE = 6
1039        queue = self.Queue(maxsize=MAXSIZE)
1040        child_can_start = self.Event()
1041        parent_can_continue = self.Event()
1042
1043        proc = self.Process(
1044            target=self._test_put,
1045            args=(queue, child_can_start, parent_can_continue)
1046            )
1047        proc.daemon = True
1048        proc.start()
1049
1050        self.assertEqual(queue_empty(queue), True)
1051        self.assertEqual(queue_full(queue, MAXSIZE), False)
1052
1053        queue.put(1)
1054        queue.put(2, True)
1055        queue.put(3, True, None)
1056        queue.put(4, False)
1057        queue.put(5, False, None)
1058        queue.put_nowait(6)
1059
1060        # the values may be in buffer but not yet in pipe so sleep a bit
1061        time.sleep(DELTA)
1062
1063        self.assertEqual(queue_empty(queue), False)
1064        self.assertEqual(queue_full(queue, MAXSIZE), True)
1065
1066        put = TimingWrapper(queue.put)
1067        put_nowait = TimingWrapper(queue.put_nowait)
1068
1069        self.assertRaises(pyqueue.Full, put, 7, False)
1070        self.assertTimingAlmostEqual(put.elapsed, 0)
1071
1072        self.assertRaises(pyqueue.Full, put, 7, False, None)
1073        self.assertTimingAlmostEqual(put.elapsed, 0)
1074
1075        self.assertRaises(pyqueue.Full, put_nowait, 7)
1076        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
1077
1078        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
1079        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
1080
1081        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
1082        self.assertTimingAlmostEqual(put.elapsed, 0)
1083
1084        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
1085        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
1086
1087        child_can_start.set()
1088        parent_can_continue.wait()
1089
1090        self.assertEqual(queue_empty(queue), True)
1091        self.assertEqual(queue_full(queue, MAXSIZE), False)
1092
1093        proc.join()
1094        close_queue(queue)
1095
1096    @classmethod
1097    def _test_get(cls, queue, child_can_start, parent_can_continue):
1098        child_can_start.wait()
1099        #queue.put(1)
1100        queue.put(2)
1101        queue.put(3)
1102        queue.put(4)
1103        queue.put(5)
1104        parent_can_continue.set()
1105
1106    def test_get(self):
1107        queue = self.Queue()
1108        child_can_start = self.Event()
1109        parent_can_continue = self.Event()
1110
1111        proc = self.Process(
1112            target=self._test_get,
1113            args=(queue, child_can_start, parent_can_continue)
1114            )
1115        proc.daemon = True
1116        proc.start()
1117
1118        self.assertEqual(queue_empty(queue), True)
1119
1120        child_can_start.set()
1121        parent_can_continue.wait()
1122
1123        time.sleep(DELTA)
1124        self.assertEqual(queue_empty(queue), False)
1125
1126        # Hangs unexpectedly, remove for now
1127        #self.assertEqual(queue.get(), 1)
1128        self.assertEqual(queue.get(True, None), 2)
1129        self.assertEqual(queue.get(True), 3)
1130        self.assertEqual(queue.get(timeout=1), 4)
1131        self.assertEqual(queue.get_nowait(), 5)
1132
1133        self.assertEqual(queue_empty(queue), True)
1134
1135        get = TimingWrapper(queue.get)
1136        get_nowait = TimingWrapper(queue.get_nowait)
1137
1138        self.assertRaises(pyqueue.Empty, get, False)
1139        self.assertTimingAlmostEqual(get.elapsed, 0)
1140
1141        self.assertRaises(pyqueue.Empty, get, False, None)
1142        self.assertTimingAlmostEqual(get.elapsed, 0)
1143
1144        self.assertRaises(pyqueue.Empty, get_nowait)
1145        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
1146
1147        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
1148        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1149
1150        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
1151        self.assertTimingAlmostEqual(get.elapsed, 0)
1152
1153        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1154        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1155
1156        proc.join()
1157        close_queue(queue)
1158
1159    @classmethod
1160    def _test_fork(cls, queue):
1161        for i in range(10, 20):
1162            queue.put(i)
1163        # note that at this point the items may only be buffered, so the
1164        # process cannot shutdown until the feeder thread has finished
1165        # pushing items onto the pipe.
1166
1167    def test_fork(self):
1168        # Old versions of Queue would fail to create a new feeder
1169        # thread for a forked process if the original process had its
1170        # own feeder thread.  This test checks that this no longer
1171        # happens.
1172
1173        queue = self.Queue()
1174
1175        # put items on queue so that main process starts a feeder thread
1176        for i in range(10):
1177            queue.put(i)
1178
1179        # wait to make sure thread starts before we fork a new process
1180        time.sleep(DELTA)
1181
1182        # fork process
1183        p = self.Process(target=self._test_fork, args=(queue,))
1184        p.daemon = True
1185        p.start()
1186
1187        # check that all expected items are in the queue
1188        for i in range(20):
1189            self.assertEqual(queue.get(), i)
1190        self.assertRaises(pyqueue.Empty, queue.get, False)
1191
1192        p.join()
1193        close_queue(queue)
1194
1195    def test_qsize(self):
1196        q = self.Queue()
1197        try:
1198            self.assertEqual(q.qsize(), 0)
1199        except NotImplementedError:
1200            self.skipTest('qsize method not implemented')
1201        q.put(1)
1202        self.assertEqual(q.qsize(), 1)
1203        q.put(5)
1204        self.assertEqual(q.qsize(), 2)
1205        q.get()
1206        self.assertEqual(q.qsize(), 1)
1207        q.get()
1208        self.assertEqual(q.qsize(), 0)
1209        close_queue(q)
1210
1211    @classmethod
1212    def _test_task_done(cls, q):
1213        for obj in iter(q.get, None):
1214            time.sleep(DELTA)
1215            q.task_done()
1216
1217    def test_task_done(self):
1218        queue = self.JoinableQueue()
1219
1220        workers = [self.Process(target=self._test_task_done, args=(queue,))
1221                   for i in range(4)]
1222
1223        for p in workers:
1224            p.daemon = True
1225            p.start()
1226
1227        for i in range(10):
1228            queue.put(i)
1229
1230        queue.join()
1231
1232        for p in workers:
1233            queue.put(None)
1234
1235        for p in workers:
1236            p.join()
1237        close_queue(queue)
1238
1239    def test_no_import_lock_contention(self):
1240        with os_helper.temp_cwd():
1241            module_name = 'imported_by_an_imported_module'
1242            with open(module_name + '.py', 'w', encoding="utf-8") as f:
1243                f.write("""if 1:
1244                    import multiprocessing
1245
1246                    q = multiprocessing.Queue()
1247                    q.put('knock knock')
1248                    q.get(timeout=3)
1249                    q.close()
1250                    del q
1251                """)
1252
1253            with import_helper.DirsOnSysPath(os.getcwd()):
1254                try:
1255                    __import__(module_name)
1256                except pyqueue.Empty:
1257                    self.fail("Probable regression on import lock contention;"
1258                              " see Issue #22853")
1259
1260    def test_timeout(self):
1261        q = multiprocessing.Queue()
1262        start = time.monotonic()
1263        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1264        delta = time.monotonic() - start
1265        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1266        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1267        # failed because the delta was only 135.8 ms.
1268        self.assertGreaterEqual(delta, 0.100)
1269        close_queue(q)
1270
1271    def test_queue_feeder_donot_stop_onexc(self):
1272        # bpo-30414: verify feeder handles exceptions correctly
1273        if self.TYPE != 'processes':
1274            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1275
1276        class NotSerializable(object):
1277            def __reduce__(self):
1278                raise AttributeError
1279        with test.support.captured_stderr():
1280            q = self.Queue()
1281            q.put(NotSerializable())
1282            q.put(True)
1283            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1284            close_queue(q)
1285
1286        with test.support.captured_stderr():
1287            # bpo-33078: verify that the queue size is correctly handled
1288            # on errors.
1289            q = self.Queue(maxsize=1)
1290            q.put(NotSerializable())
1291            q.put(True)
1292            try:
1293                self.assertEqual(q.qsize(), 1)
1294            except NotImplementedError:
1295                # qsize is not available on all platform as it
1296                # relies on sem_getvalue
1297                pass
1298            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1299            # Check that the size of the queue is correct
1300            self.assertTrue(q.empty())
1301            close_queue(q)
1302
1303    def test_queue_feeder_on_queue_feeder_error(self):
1304        # bpo-30006: verify feeder handles exceptions using the
1305        # _on_queue_feeder_error hook.
1306        if self.TYPE != 'processes':
1307            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1308
1309        class NotSerializable(object):
1310            """Mock unserializable object"""
1311            def __init__(self):
1312                self.reduce_was_called = False
1313                self.on_queue_feeder_error_was_called = False
1314
1315            def __reduce__(self):
1316                self.reduce_was_called = True
1317                raise AttributeError
1318
1319        class SafeQueue(multiprocessing.queues.Queue):
1320            """Queue with overloaded _on_queue_feeder_error hook"""
1321            @staticmethod
1322            def _on_queue_feeder_error(e, obj):
1323                if (isinstance(e, AttributeError) and
1324                        isinstance(obj, NotSerializable)):
1325                    obj.on_queue_feeder_error_was_called = True
1326
1327        not_serializable_obj = NotSerializable()
1328        # The captured_stderr reduces the noise in the test report
1329        with test.support.captured_stderr():
1330            q = SafeQueue(ctx=multiprocessing.get_context())
1331            q.put(not_serializable_obj)
1332
1333            # Verify that q is still functioning correctly
1334            q.put(True)
1335            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1336
1337        # Assert that the serialization and the hook have been called correctly
1338        self.assertTrue(not_serializable_obj.reduce_was_called)
1339        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1340
1341    def test_closed_queue_empty_exceptions(self):
1342        # Assert that checking the emptiness of an unused closed queue
1343        # does not raise an OSError. The rationale is that q.close() is
1344        # a no-op upon construction and becomes effective once the queue
1345        # has been used (e.g., by calling q.put()).
1346        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1347            q.close()  # this is a no-op since the feeder thread is None
1348            q.join_thread()  # this is also a no-op
1349            self.assertTrue(q.empty())
1350
1351        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1352            q.put('foo')  # make sure that the queue is 'used'
1353            q.close()  # close the feeder thread
1354            q.join_thread()  # make sure to join the feeder thread
1355            with self.assertRaisesRegex(OSError, 'is closed'):
1356                q.empty()
1357
1358    def test_closed_queue_put_get_exceptions(self):
1359        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1360            q.close()
1361            with self.assertRaisesRegex(ValueError, 'is closed'):
1362                q.put('foo')
1363            with self.assertRaisesRegex(ValueError, 'is closed'):
1364                q.get()
1365#
1366#
1367#
1368
1369class _TestLock(BaseTestCase):
1370
1371    @staticmethod
1372    def _acquire(lock, l=None):
1373        lock.acquire()
1374        if l is not None:
1375            l.append(repr(lock))
1376
1377    @staticmethod
1378    def _acquire_event(lock, event):
1379        lock.acquire()
1380        event.set()
1381        time.sleep(1.0)
1382
1383    def test_repr_lock(self):
1384        if self.TYPE != 'processes':
1385            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1386
1387        lock = self.Lock()
1388        self.assertEqual(f'<Lock(owner=None)>', repr(lock))
1389
1390        lock.acquire()
1391        self.assertEqual(f'<Lock(owner=MainProcess)>', repr(lock))
1392        lock.release()
1393
1394        tname = 'T1'
1395        l = []
1396        t = threading.Thread(target=self._acquire,
1397                             args=(lock, l),
1398                             name=tname)
1399        t.start()
1400        time.sleep(0.1)
1401        self.assertEqual(f'<Lock(owner=MainProcess|{tname})>', l[0])
1402        lock.release()
1403
1404        t = threading.Thread(target=self._acquire,
1405                             args=(lock,),
1406                             name=tname)
1407        t.start()
1408        time.sleep(0.1)
1409        self.assertEqual('<Lock(owner=SomeOtherThread)>', repr(lock))
1410        lock.release()
1411
1412        pname = 'P1'
1413        l = multiprocessing.Manager().list()
1414        p = self.Process(target=self._acquire,
1415                         args=(lock, l),
1416                         name=pname)
1417        p.start()
1418        p.join()
1419        self.assertEqual(f'<Lock(owner={pname})>', l[0])
1420
1421        lock = self.Lock()
1422        event = self.Event()
1423        p = self.Process(target=self._acquire_event,
1424                         args=(lock, event),
1425                         name='P2')
1426        p.start()
1427        event.wait()
1428        self.assertEqual(f'<Lock(owner=SomeOtherProcess)>', repr(lock))
1429        p.terminate()
1430
1431    def test_lock(self):
1432        lock = self.Lock()
1433        self.assertEqual(lock.acquire(), True)
1434        self.assertEqual(lock.acquire(False), False)
1435        self.assertEqual(lock.release(), None)
1436        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1437
1438    @staticmethod
1439    def _acquire_release(lock, timeout, l=None, n=1):
1440        for _ in range(n):
1441            lock.acquire()
1442        if l is not None:
1443            l.append(repr(lock))
1444        time.sleep(timeout)
1445        for _ in range(n):
1446            lock.release()
1447
1448    def test_repr_rlock(self):
1449        if self.TYPE != 'processes':
1450            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1451
1452        lock = self.RLock()
1453        self.assertEqual('<RLock(None, 0)>', repr(lock))
1454
1455        n = 3
1456        for _ in range(n):
1457            lock.acquire()
1458        self.assertEqual(f'<RLock(MainProcess, {n})>', repr(lock))
1459        for _ in range(n):
1460            lock.release()
1461
1462        t, l = [], []
1463        for i in range(n):
1464            t.append(threading.Thread(target=self._acquire_release,
1465                                      args=(lock, 0.1, l, i+1),
1466                                      name=f'T{i+1}'))
1467            t[-1].start()
1468        for t_ in t:
1469            t_.join()
1470        for i in range(n):
1471            self.assertIn(f'<RLock(MainProcess|T{i+1}, {i+1})>', l)
1472
1473
1474        t = threading.Thread(target=self._acquire_release,
1475                                 args=(lock, 0.2),
1476                                 name=f'T1')
1477        t.start()
1478        time.sleep(0.1)
1479        self.assertEqual('<RLock(SomeOtherThread, nonzero)>', repr(lock))
1480        time.sleep(0.2)
1481
1482        pname = 'P1'
1483        l = multiprocessing.Manager().list()
1484        p = self.Process(target=self._acquire_release,
1485                         args=(lock, 0.1, l),
1486                         name=pname)
1487        p.start()
1488        p.join()
1489        self.assertEqual(f'<RLock({pname}, 1)>', l[0])
1490
1491        event = self.Event()
1492        lock = self.RLock()
1493        p = self.Process(target=self._acquire_event,
1494                         args=(lock, event))
1495        p.start()
1496        event.wait()
1497        self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(lock))
1498        p.join()
1499
1500    def test_rlock(self):
1501        lock = self.RLock()
1502        self.assertEqual(lock.acquire(), True)
1503        self.assertEqual(lock.acquire(), True)
1504        self.assertEqual(lock.acquire(), True)
1505        self.assertEqual(lock.release(), None)
1506        self.assertEqual(lock.release(), None)
1507        self.assertEqual(lock.release(), None)
1508        self.assertRaises((AssertionError, RuntimeError), lock.release)
1509
1510    def test_lock_context(self):
1511        with self.Lock():
1512            pass
1513
1514
1515class _TestSemaphore(BaseTestCase):
1516
1517    def _test_semaphore(self, sem):
1518        self.assertReturnsIfImplemented(2, get_value, sem)
1519        self.assertEqual(sem.acquire(), True)
1520        self.assertReturnsIfImplemented(1, get_value, sem)
1521        self.assertEqual(sem.acquire(), True)
1522        self.assertReturnsIfImplemented(0, get_value, sem)
1523        self.assertEqual(sem.acquire(False), False)
1524        self.assertReturnsIfImplemented(0, get_value, sem)
1525        self.assertEqual(sem.release(), None)
1526        self.assertReturnsIfImplemented(1, get_value, sem)
1527        self.assertEqual(sem.release(), None)
1528        self.assertReturnsIfImplemented(2, get_value, sem)
1529
1530    def test_semaphore(self):
1531        sem = self.Semaphore(2)
1532        self._test_semaphore(sem)
1533        self.assertEqual(sem.release(), None)
1534        self.assertReturnsIfImplemented(3, get_value, sem)
1535        self.assertEqual(sem.release(), None)
1536        self.assertReturnsIfImplemented(4, get_value, sem)
1537
1538    def test_bounded_semaphore(self):
1539        sem = self.BoundedSemaphore(2)
1540        self._test_semaphore(sem)
1541        # Currently fails on OS/X
1542        #if HAVE_GETVALUE:
1543        #    self.assertRaises(ValueError, sem.release)
1544        #    self.assertReturnsIfImplemented(2, get_value, sem)
1545
1546    def test_timeout(self):
1547        if self.TYPE != 'processes':
1548            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1549
1550        sem = self.Semaphore(0)
1551        acquire = TimingWrapper(sem.acquire)
1552
1553        self.assertEqual(acquire(False), False)
1554        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1555
1556        self.assertEqual(acquire(False, None), False)
1557        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1558
1559        self.assertEqual(acquire(False, TIMEOUT1), False)
1560        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1561
1562        self.assertEqual(acquire(True, TIMEOUT2), False)
1563        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1564
1565        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1566        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1567
1568
1569class _TestCondition(BaseTestCase):
1570
1571    @classmethod
1572    def f(cls, cond, sleeping, woken, timeout=None):
1573        cond.acquire()
1574        sleeping.release()
1575        cond.wait(timeout)
1576        woken.release()
1577        cond.release()
1578
1579    def assertReachesEventually(self, func, value):
1580        for i in range(10):
1581            try:
1582                if func() == value:
1583                    break
1584            except NotImplementedError:
1585                break
1586            time.sleep(DELTA)
1587        time.sleep(DELTA)
1588        self.assertReturnsIfImplemented(value, func)
1589
1590    def check_invariant(self, cond):
1591        # this is only supposed to succeed when there are no sleepers
1592        if self.TYPE == 'processes':
1593            try:
1594                sleepers = (cond._sleeping_count.get_value() -
1595                            cond._woken_count.get_value())
1596                self.assertEqual(sleepers, 0)
1597                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1598            except NotImplementedError:
1599                pass
1600
1601    def test_notify(self):
1602        cond = self.Condition()
1603        sleeping = self.Semaphore(0)
1604        woken = self.Semaphore(0)
1605
1606        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1607        p.daemon = True
1608        p.start()
1609        self.addCleanup(p.join)
1610
1611        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1612        p.daemon = True
1613        p.start()
1614        self.addCleanup(p.join)
1615
1616        # wait for both children to start sleeping
1617        sleeping.acquire()
1618        sleeping.acquire()
1619
1620        # check no process/thread has woken up
1621        time.sleep(DELTA)
1622        self.assertReturnsIfImplemented(0, get_value, woken)
1623
1624        # wake up one process/thread
1625        cond.acquire()
1626        cond.notify()
1627        cond.release()
1628
1629        # check one process/thread has woken up
1630        time.sleep(DELTA)
1631        self.assertReturnsIfImplemented(1, get_value, woken)
1632
1633        # wake up another
1634        cond.acquire()
1635        cond.notify()
1636        cond.release()
1637
1638        # check other has woken up
1639        time.sleep(DELTA)
1640        self.assertReturnsIfImplemented(2, get_value, woken)
1641
1642        # check state is not mucked up
1643        self.check_invariant(cond)
1644        p.join()
1645
1646    def test_notify_all(self):
1647        cond = self.Condition()
1648        sleeping = self.Semaphore(0)
1649        woken = self.Semaphore(0)
1650
1651        # start some threads/processes which will timeout
1652        for i in range(3):
1653            p = self.Process(target=self.f,
1654                             args=(cond, sleeping, woken, TIMEOUT1))
1655            p.daemon = True
1656            p.start()
1657            self.addCleanup(p.join)
1658
1659            t = threading.Thread(target=self.f,
1660                                 args=(cond, sleeping, woken, TIMEOUT1))
1661            t.daemon = True
1662            t.start()
1663            self.addCleanup(t.join)
1664
1665        # wait for them all to sleep
1666        for i in range(6):
1667            sleeping.acquire()
1668
1669        # check they have all timed out
1670        for i in range(6):
1671            woken.acquire()
1672        self.assertReturnsIfImplemented(0, get_value, woken)
1673
1674        # check state is not mucked up
1675        self.check_invariant(cond)
1676
1677        # start some more threads/processes
1678        for i in range(3):
1679            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1680            p.daemon = True
1681            p.start()
1682            self.addCleanup(p.join)
1683
1684            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1685            t.daemon = True
1686            t.start()
1687            self.addCleanup(t.join)
1688
1689        # wait for them to all sleep
1690        for i in range(6):
1691            sleeping.acquire()
1692
1693        # check no process/thread has woken up
1694        time.sleep(DELTA)
1695        self.assertReturnsIfImplemented(0, get_value, woken)
1696
1697        # wake them all up
1698        cond.acquire()
1699        cond.notify_all()
1700        cond.release()
1701
1702        # check they have all woken
1703        self.assertReachesEventually(lambda: get_value(woken), 6)
1704
1705        # check state is not mucked up
1706        self.check_invariant(cond)
1707
1708    def test_notify_n(self):
1709        cond = self.Condition()
1710        sleeping = self.Semaphore(0)
1711        woken = self.Semaphore(0)
1712
1713        # start some threads/processes
1714        for i in range(3):
1715            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1716            p.daemon = True
1717            p.start()
1718            self.addCleanup(p.join)
1719
1720            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1721            t.daemon = True
1722            t.start()
1723            self.addCleanup(t.join)
1724
1725        # wait for them to all sleep
1726        for i in range(6):
1727            sleeping.acquire()
1728
1729        # check no process/thread has woken up
1730        time.sleep(DELTA)
1731        self.assertReturnsIfImplemented(0, get_value, woken)
1732
1733        # wake some of them up
1734        cond.acquire()
1735        cond.notify(n=2)
1736        cond.release()
1737
1738        # check 2 have woken
1739        self.assertReachesEventually(lambda: get_value(woken), 2)
1740
1741        # wake the rest of them
1742        cond.acquire()
1743        cond.notify(n=4)
1744        cond.release()
1745
1746        self.assertReachesEventually(lambda: get_value(woken), 6)
1747
1748        # doesn't do anything more
1749        cond.acquire()
1750        cond.notify(n=3)
1751        cond.release()
1752
1753        self.assertReturnsIfImplemented(6, get_value, woken)
1754
1755        # check state is not mucked up
1756        self.check_invariant(cond)
1757
1758    def test_timeout(self):
1759        cond = self.Condition()
1760        wait = TimingWrapper(cond.wait)
1761        cond.acquire()
1762        res = wait(TIMEOUT1)
1763        cond.release()
1764        self.assertEqual(res, False)
1765        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1766
1767    @classmethod
1768    def _test_waitfor_f(cls, cond, state):
1769        with cond:
1770            state.value = 0
1771            cond.notify()
1772            result = cond.wait_for(lambda : state.value==4)
1773            if not result or state.value != 4:
1774                sys.exit(1)
1775
1776    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1777    def test_waitfor(self):
1778        # based on test in test/lock_tests.py
1779        cond = self.Condition()
1780        state = self.Value('i', -1)
1781
1782        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1783        p.daemon = True
1784        p.start()
1785
1786        with cond:
1787            result = cond.wait_for(lambda : state.value==0)
1788            self.assertTrue(result)
1789            self.assertEqual(state.value, 0)
1790
1791        for i in range(4):
1792            time.sleep(0.01)
1793            with cond:
1794                state.value += 1
1795                cond.notify()
1796
1797        join_process(p)
1798        self.assertEqual(p.exitcode, 0)
1799
1800    @classmethod
1801    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1802        sem.release()
1803        with cond:
1804            expected = 0.100
1805            dt = time.monotonic()
1806            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1807            dt = time.monotonic() - dt
1808            if not result and (expected - CLOCK_RES) <= dt:
1809                success.value = True
1810
1811    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1812    def test_waitfor_timeout(self):
1813        # based on test in test/lock_tests.py
1814        cond = self.Condition()
1815        state = self.Value('i', 0)
1816        success = self.Value('i', False)
1817        sem = self.Semaphore(0)
1818
1819        p = self.Process(target=self._test_waitfor_timeout_f,
1820                         args=(cond, state, success, sem))
1821        p.daemon = True
1822        p.start()
1823        self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT))
1824
1825        # Only increment 3 times, so state == 4 is never reached.
1826        for i in range(3):
1827            time.sleep(0.010)
1828            with cond:
1829                state.value += 1
1830                cond.notify()
1831
1832        join_process(p)
1833        self.assertTrue(success.value)
1834
1835    @classmethod
1836    def _test_wait_result(cls, c, pid):
1837        with c:
1838            c.notify()
1839        time.sleep(1)
1840        if pid is not None:
1841            os.kill(pid, signal.SIGINT)
1842
1843    def test_wait_result(self):
1844        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1845            pid = os.getpid()
1846        else:
1847            pid = None
1848
1849        c = self.Condition()
1850        with c:
1851            self.assertFalse(c.wait(0))
1852            self.assertFalse(c.wait(0.1))
1853
1854            p = self.Process(target=self._test_wait_result, args=(c, pid))
1855            p.start()
1856
1857            self.assertTrue(c.wait(60))
1858            if pid is not None:
1859                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1860
1861            p.join()
1862
1863
1864class _TestEvent(BaseTestCase):
1865
1866    @classmethod
1867    def _test_event(cls, event):
1868        time.sleep(TIMEOUT2)
1869        event.set()
1870
1871    def test_event(self):
1872        event = self.Event()
1873        wait = TimingWrapper(event.wait)
1874
1875        # Removed temporarily, due to API shear, this does not
1876        # work with threading._Event objects. is_set == isSet
1877        self.assertEqual(event.is_set(), False)
1878
1879        # Removed, threading.Event.wait() will return the value of the __flag
1880        # instead of None. API Shear with the semaphore backed mp.Event
1881        self.assertEqual(wait(0.0), False)
1882        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1883        self.assertEqual(wait(TIMEOUT1), False)
1884        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1885
1886        event.set()
1887
1888        # See note above on the API differences
1889        self.assertEqual(event.is_set(), True)
1890        self.assertEqual(wait(), True)
1891        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1892        self.assertEqual(wait(TIMEOUT1), True)
1893        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1894        # self.assertEqual(event.is_set(), True)
1895
1896        event.clear()
1897
1898        #self.assertEqual(event.is_set(), False)
1899
1900        p = self.Process(target=self._test_event, args=(event,))
1901        p.daemon = True
1902        p.start()
1903        self.assertEqual(wait(), True)
1904        p.join()
1905
1906    def test_repr(self) -> None:
1907        event = self.Event()
1908        if self.TYPE == 'processes':
1909            self.assertRegex(repr(event), r"<Event at .* unset>")
1910            event.set()
1911            self.assertRegex(repr(event), r"<Event at .* set>")
1912            event.clear()
1913            self.assertRegex(repr(event), r"<Event at .* unset>")
1914        elif self.TYPE == 'manager':
1915            self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*")
1916            event.set()
1917            self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*")
1918
1919
1920# Tests for Barrier - adapted from tests in test/lock_tests.py
1921#
1922
1923# Many of the tests for threading.Barrier use a list as an atomic
1924# counter: a value is appended to increment the counter, and the
1925# length of the list gives the value.  We use the class DummyList
1926# for the same purpose.
1927
1928class _DummyList(object):
1929
1930    def __init__(self):
1931        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1932        lock = multiprocessing.Lock()
1933        self.__setstate__((wrapper, lock))
1934        self._lengthbuf[0] = 0
1935
1936    def __setstate__(self, state):
1937        (self._wrapper, self._lock) = state
1938        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1939
1940    def __getstate__(self):
1941        return (self._wrapper, self._lock)
1942
1943    def append(self, _):
1944        with self._lock:
1945            self._lengthbuf[0] += 1
1946
1947    def __len__(self):
1948        with self._lock:
1949            return self._lengthbuf[0]
1950
1951def _wait():
1952    # A crude wait/yield function not relying on synchronization primitives.
1953    time.sleep(0.01)
1954
1955
1956class Bunch(object):
1957    """
1958    A bunch of threads.
1959    """
1960    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1961        """
1962        Construct a bunch of `n` threads running the same function `f`.
1963        If `wait_before_exit` is True, the threads won't terminate until
1964        do_finish() is called.
1965        """
1966        self.f = f
1967        self.args = args
1968        self.n = n
1969        self.started = namespace.DummyList()
1970        self.finished = namespace.DummyList()
1971        self._can_exit = namespace.Event()
1972        if not wait_before_exit:
1973            self._can_exit.set()
1974
1975        threads = []
1976        for i in range(n):
1977            p = namespace.Process(target=self.task)
1978            p.daemon = True
1979            p.start()
1980            threads.append(p)
1981
1982        def finalize(threads):
1983            for p in threads:
1984                p.join()
1985
1986        self._finalizer = weakref.finalize(self, finalize, threads)
1987
1988    def task(self):
1989        pid = os.getpid()
1990        self.started.append(pid)
1991        try:
1992            self.f(*self.args)
1993        finally:
1994            self.finished.append(pid)
1995            self._can_exit.wait(30)
1996            assert self._can_exit.is_set()
1997
1998    def wait_for_started(self):
1999        while len(self.started) < self.n:
2000            _wait()
2001
2002    def wait_for_finished(self):
2003        while len(self.finished) < self.n:
2004            _wait()
2005
2006    def do_finish(self):
2007        self._can_exit.set()
2008
2009    def close(self):
2010        self._finalizer()
2011
2012
2013class AppendTrue(object):
2014    def __init__(self, obj):
2015        self.obj = obj
2016    def __call__(self):
2017        self.obj.append(True)
2018
2019
2020class _TestBarrier(BaseTestCase):
2021    """
2022    Tests for Barrier objects.
2023    """
2024    N = 5
2025    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
2026
2027    def setUp(self):
2028        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
2029
2030    def tearDown(self):
2031        self.barrier.abort()
2032        self.barrier = None
2033
2034    def DummyList(self):
2035        if self.TYPE == 'threads':
2036            return []
2037        elif self.TYPE == 'manager':
2038            return self.manager.list()
2039        else:
2040            return _DummyList()
2041
2042    def run_threads(self, f, args):
2043        b = Bunch(self, f, args, self.N-1)
2044        try:
2045            f(*args)
2046            b.wait_for_finished()
2047        finally:
2048            b.close()
2049
2050    @classmethod
2051    def multipass(cls, barrier, results, n):
2052        m = barrier.parties
2053        assert m == cls.N
2054        for i in range(n):
2055            results[0].append(True)
2056            assert len(results[1]) == i * m
2057            barrier.wait()
2058            results[1].append(True)
2059            assert len(results[0]) == (i + 1) * m
2060            barrier.wait()
2061        try:
2062            assert barrier.n_waiting == 0
2063        except NotImplementedError:
2064            pass
2065        assert not barrier.broken
2066
2067    def test_barrier(self, passes=1):
2068        """
2069        Test that a barrier is passed in lockstep
2070        """
2071        results = [self.DummyList(), self.DummyList()]
2072        self.run_threads(self.multipass, (self.barrier, results, passes))
2073
2074    def test_barrier_10(self):
2075        """
2076        Test that a barrier works for 10 consecutive runs
2077        """
2078        return self.test_barrier(10)
2079
2080    @classmethod
2081    def _test_wait_return_f(cls, barrier, queue):
2082        res = barrier.wait()
2083        queue.put(res)
2084
2085    def test_wait_return(self):
2086        """
2087        test the return value from barrier.wait
2088        """
2089        queue = self.Queue()
2090        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
2091        results = [queue.get() for i in range(self.N)]
2092        self.assertEqual(results.count(0), 1)
2093        close_queue(queue)
2094
2095    @classmethod
2096    def _test_action_f(cls, barrier, results):
2097        barrier.wait()
2098        if len(results) != 1:
2099            raise RuntimeError
2100
2101    def test_action(self):
2102        """
2103        Test the 'action' callback
2104        """
2105        results = self.DummyList()
2106        barrier = self.Barrier(self.N, action=AppendTrue(results))
2107        self.run_threads(self._test_action_f, (barrier, results))
2108        self.assertEqual(len(results), 1)
2109
2110    @classmethod
2111    def _test_abort_f(cls, barrier, results1, results2):
2112        try:
2113            i = barrier.wait()
2114            if i == cls.N//2:
2115                raise RuntimeError
2116            barrier.wait()
2117            results1.append(True)
2118        except threading.BrokenBarrierError:
2119            results2.append(True)
2120        except RuntimeError:
2121            barrier.abort()
2122
2123    def test_abort(self):
2124        """
2125        Test that an abort will put the barrier in a broken state
2126        """
2127        results1 = self.DummyList()
2128        results2 = self.DummyList()
2129        self.run_threads(self._test_abort_f,
2130                         (self.barrier, results1, results2))
2131        self.assertEqual(len(results1), 0)
2132        self.assertEqual(len(results2), self.N-1)
2133        self.assertTrue(self.barrier.broken)
2134
2135    @classmethod
2136    def _test_reset_f(cls, barrier, results1, results2, results3):
2137        i = barrier.wait()
2138        if i == cls.N//2:
2139            # Wait until the other threads are all in the barrier.
2140            while barrier.n_waiting < cls.N-1:
2141                time.sleep(0.001)
2142            barrier.reset()
2143        else:
2144            try:
2145                barrier.wait()
2146                results1.append(True)
2147            except threading.BrokenBarrierError:
2148                results2.append(True)
2149        # Now, pass the barrier again
2150        barrier.wait()
2151        results3.append(True)
2152
2153    def test_reset(self):
2154        """
2155        Test that a 'reset' on a barrier frees the waiting threads
2156        """
2157        results1 = self.DummyList()
2158        results2 = self.DummyList()
2159        results3 = self.DummyList()
2160        self.run_threads(self._test_reset_f,
2161                         (self.barrier, results1, results2, results3))
2162        self.assertEqual(len(results1), 0)
2163        self.assertEqual(len(results2), self.N-1)
2164        self.assertEqual(len(results3), self.N)
2165
2166    @classmethod
2167    def _test_abort_and_reset_f(cls, barrier, barrier2,
2168                                results1, results2, results3):
2169        try:
2170            i = barrier.wait()
2171            if i == cls.N//2:
2172                raise RuntimeError
2173            barrier.wait()
2174            results1.append(True)
2175        except threading.BrokenBarrierError:
2176            results2.append(True)
2177        except RuntimeError:
2178            barrier.abort()
2179        # Synchronize and reset the barrier.  Must synchronize first so
2180        # that everyone has left it when we reset, and after so that no
2181        # one enters it before the reset.
2182        if barrier2.wait() == cls.N//2:
2183            barrier.reset()
2184        barrier2.wait()
2185        barrier.wait()
2186        results3.append(True)
2187
2188    def test_abort_and_reset(self):
2189        """
2190        Test that a barrier can be reset after being broken.
2191        """
2192        results1 = self.DummyList()
2193        results2 = self.DummyList()
2194        results3 = self.DummyList()
2195        barrier2 = self.Barrier(self.N)
2196
2197        self.run_threads(self._test_abort_and_reset_f,
2198                         (self.barrier, barrier2, results1, results2, results3))
2199        self.assertEqual(len(results1), 0)
2200        self.assertEqual(len(results2), self.N-1)
2201        self.assertEqual(len(results3), self.N)
2202
2203    @classmethod
2204    def _test_timeout_f(cls, barrier, results):
2205        i = barrier.wait()
2206        if i == cls.N//2:
2207            # One thread is late!
2208            time.sleep(1.0)
2209        try:
2210            barrier.wait(0.5)
2211        except threading.BrokenBarrierError:
2212            results.append(True)
2213
2214    def test_timeout(self):
2215        """
2216        Test wait(timeout)
2217        """
2218        results = self.DummyList()
2219        self.run_threads(self._test_timeout_f, (self.barrier, results))
2220        self.assertEqual(len(results), self.barrier.parties)
2221
2222    @classmethod
2223    def _test_default_timeout_f(cls, barrier, results):
2224        i = barrier.wait(cls.defaultTimeout)
2225        if i == cls.N//2:
2226            # One thread is later than the default timeout
2227            time.sleep(1.0)
2228        try:
2229            barrier.wait()
2230        except threading.BrokenBarrierError:
2231            results.append(True)
2232
2233    def test_default_timeout(self):
2234        """
2235        Test the barrier's default timeout
2236        """
2237        barrier = self.Barrier(self.N, timeout=0.5)
2238        results = self.DummyList()
2239        self.run_threads(self._test_default_timeout_f, (barrier, results))
2240        self.assertEqual(len(results), barrier.parties)
2241
2242    def test_single_thread(self):
2243        b = self.Barrier(1)
2244        b.wait()
2245        b.wait()
2246
2247    @classmethod
2248    def _test_thousand_f(cls, barrier, passes, conn, lock):
2249        for i in range(passes):
2250            barrier.wait()
2251            with lock:
2252                conn.send(i)
2253
2254    def test_thousand(self):
2255        if self.TYPE == 'manager':
2256            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2257        passes = 1000
2258        lock = self.Lock()
2259        conn, child_conn = self.Pipe(False)
2260        for j in range(self.N):
2261            p = self.Process(target=self._test_thousand_f,
2262                           args=(self.barrier, passes, child_conn, lock))
2263            p.start()
2264            self.addCleanup(p.join)
2265
2266        for i in range(passes):
2267            for j in range(self.N):
2268                self.assertEqual(conn.recv(), i)
2269
2270#
2271#
2272#
2273
2274class _TestValue(BaseTestCase):
2275
2276    ALLOWED_TYPES = ('processes',)
2277
2278    codes_values = [
2279        ('i', 4343, 24234),
2280        ('d', 3.625, -4.25),
2281        ('h', -232, 234),
2282        ('q', 2 ** 33, 2 ** 34),
2283        ('c', latin('x'), latin('y'))
2284        ]
2285
2286    def setUp(self):
2287        if not HAS_SHAREDCTYPES:
2288            self.skipTest("requires multiprocessing.sharedctypes")
2289
2290    @classmethod
2291    def _test(cls, values):
2292        for sv, cv in zip(values, cls.codes_values):
2293            sv.value = cv[2]
2294
2295
2296    def test_value(self, raw=False):
2297        if raw:
2298            values = [self.RawValue(code, value)
2299                      for code, value, _ in self.codes_values]
2300        else:
2301            values = [self.Value(code, value)
2302                      for code, value, _ in self.codes_values]
2303
2304        for sv, cv in zip(values, self.codes_values):
2305            self.assertEqual(sv.value, cv[1])
2306
2307        proc = self.Process(target=self._test, args=(values,))
2308        proc.daemon = True
2309        proc.start()
2310        proc.join()
2311
2312        for sv, cv in zip(values, self.codes_values):
2313            self.assertEqual(sv.value, cv[2])
2314
2315    def test_rawvalue(self):
2316        self.test_value(raw=True)
2317
2318    def test_getobj_getlock(self):
2319        val1 = self.Value('i', 5)
2320        lock1 = val1.get_lock()
2321        obj1 = val1.get_obj()
2322
2323        val2 = self.Value('i', 5, lock=None)
2324        lock2 = val2.get_lock()
2325        obj2 = val2.get_obj()
2326
2327        lock = self.Lock()
2328        val3 = self.Value('i', 5, lock=lock)
2329        lock3 = val3.get_lock()
2330        obj3 = val3.get_obj()
2331        self.assertEqual(lock, lock3)
2332
2333        arr4 = self.Value('i', 5, lock=False)
2334        self.assertFalse(hasattr(arr4, 'get_lock'))
2335        self.assertFalse(hasattr(arr4, 'get_obj'))
2336
2337        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2338
2339        arr5 = self.RawValue('i', 5)
2340        self.assertFalse(hasattr(arr5, 'get_lock'))
2341        self.assertFalse(hasattr(arr5, 'get_obj'))
2342
2343
2344class _TestArray(BaseTestCase):
2345
2346    ALLOWED_TYPES = ('processes',)
2347
2348    @classmethod
2349    def f(cls, seq):
2350        for i in range(1, len(seq)):
2351            seq[i] += seq[i-1]
2352
2353    @unittest.skipIf(c_int is None, "requires _ctypes")
2354    def test_array(self, raw=False):
2355        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2356        if raw:
2357            arr = self.RawArray('i', seq)
2358        else:
2359            arr = self.Array('i', seq)
2360
2361        self.assertEqual(len(arr), len(seq))
2362        self.assertEqual(arr[3], seq[3])
2363        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2364
2365        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2366
2367        self.assertEqual(list(arr[:]), seq)
2368
2369        self.f(seq)
2370
2371        p = self.Process(target=self.f, args=(arr,))
2372        p.daemon = True
2373        p.start()
2374        p.join()
2375
2376        self.assertEqual(list(arr[:]), seq)
2377
2378    @unittest.skipIf(c_int is None, "requires _ctypes")
2379    def test_array_from_size(self):
2380        size = 10
2381        # Test for zeroing (see issue #11675).
2382        # The repetition below strengthens the test by increasing the chances
2383        # of previously allocated non-zero memory being used for the new array
2384        # on the 2nd and 3rd loops.
2385        for _ in range(3):
2386            arr = self.Array('i', size)
2387            self.assertEqual(len(arr), size)
2388            self.assertEqual(list(arr), [0] * size)
2389            arr[:] = range(10)
2390            self.assertEqual(list(arr), list(range(10)))
2391            del arr
2392
2393    @unittest.skipIf(c_int is None, "requires _ctypes")
2394    def test_rawarray(self):
2395        self.test_array(raw=True)
2396
2397    @unittest.skipIf(c_int is None, "requires _ctypes")
2398    def test_getobj_getlock_obj(self):
2399        arr1 = self.Array('i', list(range(10)))
2400        lock1 = arr1.get_lock()
2401        obj1 = arr1.get_obj()
2402
2403        arr2 = self.Array('i', list(range(10)), lock=None)
2404        lock2 = arr2.get_lock()
2405        obj2 = arr2.get_obj()
2406
2407        lock = self.Lock()
2408        arr3 = self.Array('i', list(range(10)), lock=lock)
2409        lock3 = arr3.get_lock()
2410        obj3 = arr3.get_obj()
2411        self.assertEqual(lock, lock3)
2412
2413        arr4 = self.Array('i', range(10), lock=False)
2414        self.assertFalse(hasattr(arr4, 'get_lock'))
2415        self.assertFalse(hasattr(arr4, 'get_obj'))
2416        self.assertRaises(AttributeError,
2417                          self.Array, 'i', range(10), lock='notalock')
2418
2419        arr5 = self.RawArray('i', range(10))
2420        self.assertFalse(hasattr(arr5, 'get_lock'))
2421        self.assertFalse(hasattr(arr5, 'get_obj'))
2422
2423#
2424#
2425#
2426
2427class _TestContainers(BaseTestCase):
2428
2429    ALLOWED_TYPES = ('manager',)
2430
2431    def test_list(self):
2432        a = self.list(list(range(10)))
2433        self.assertEqual(a[:], list(range(10)))
2434
2435        b = self.list()
2436        self.assertEqual(b[:], [])
2437
2438        b.extend(list(range(5)))
2439        self.assertEqual(b[:], list(range(5)))
2440
2441        self.assertEqual(b[2], 2)
2442        self.assertEqual(b[2:10], [2,3,4])
2443
2444        b *= 2
2445        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2446
2447        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2448
2449        self.assertEqual(a[:], list(range(10)))
2450
2451        d = [a, b]
2452        e = self.list(d)
2453        self.assertEqual(
2454            [element[:] for element in e],
2455            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2456            )
2457
2458        f = self.list([a])
2459        a.append('hello')
2460        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2461
2462    def test_list_iter(self):
2463        a = self.list(list(range(10)))
2464        it = iter(a)
2465        self.assertEqual(list(it), list(range(10)))
2466        self.assertEqual(list(it), [])  # exhausted
2467        # list modified during iteration
2468        it = iter(a)
2469        a[0] = 100
2470        self.assertEqual(next(it), 100)
2471
2472    def test_list_proxy_in_list(self):
2473        a = self.list([self.list(range(3)) for _i in range(3)])
2474        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2475
2476        a[0][-1] = 55
2477        self.assertEqual(a[0][:], [0, 1, 55])
2478        for i in range(1, 3):
2479            self.assertEqual(a[i][:], [0, 1, 2])
2480
2481        self.assertEqual(a[1].pop(), 2)
2482        self.assertEqual(len(a[1]), 2)
2483        for i in range(0, 3, 2):
2484            self.assertEqual(len(a[i]), 3)
2485
2486        del a
2487
2488        b = self.list()
2489        b.append(b)
2490        del b
2491
2492    def test_dict(self):
2493        d = self.dict()
2494        indices = list(range(65, 70))
2495        for i in indices:
2496            d[i] = chr(i)
2497        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2498        self.assertEqual(sorted(d.keys()), indices)
2499        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2500        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2501
2502    def test_dict_iter(self):
2503        d = self.dict()
2504        indices = list(range(65, 70))
2505        for i in indices:
2506            d[i] = chr(i)
2507        it = iter(d)
2508        self.assertEqual(list(it), indices)
2509        self.assertEqual(list(it), [])  # exhausted
2510        # dictionary changed size during iteration
2511        it = iter(d)
2512        d.clear()
2513        self.assertRaises(RuntimeError, next, it)
2514
2515    def test_dict_proxy_nested(self):
2516        pets = self.dict(ferrets=2, hamsters=4)
2517        supplies = self.dict(water=10, feed=3)
2518        d = self.dict(pets=pets, supplies=supplies)
2519
2520        self.assertEqual(supplies['water'], 10)
2521        self.assertEqual(d['supplies']['water'], 10)
2522
2523        d['supplies']['blankets'] = 5
2524        self.assertEqual(supplies['blankets'], 5)
2525        self.assertEqual(d['supplies']['blankets'], 5)
2526
2527        d['supplies']['water'] = 7
2528        self.assertEqual(supplies['water'], 7)
2529        self.assertEqual(d['supplies']['water'], 7)
2530
2531        del pets
2532        del supplies
2533        self.assertEqual(d['pets']['ferrets'], 2)
2534        d['supplies']['blankets'] = 11
2535        self.assertEqual(d['supplies']['blankets'], 11)
2536
2537        pets = d['pets']
2538        supplies = d['supplies']
2539        supplies['water'] = 7
2540        self.assertEqual(supplies['water'], 7)
2541        self.assertEqual(d['supplies']['water'], 7)
2542
2543        d.clear()
2544        self.assertEqual(len(d), 0)
2545        self.assertEqual(supplies['water'], 7)
2546        self.assertEqual(pets['hamsters'], 4)
2547
2548        l = self.list([pets, supplies])
2549        l[0]['marmots'] = 1
2550        self.assertEqual(pets['marmots'], 1)
2551        self.assertEqual(l[0]['marmots'], 1)
2552
2553        del pets
2554        del supplies
2555        self.assertEqual(l[0]['marmots'], 1)
2556
2557        outer = self.list([[88, 99], l])
2558        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2559        self.assertEqual(outer[-1][-1]['feed'], 3)
2560
2561    def test_nested_queue(self):
2562        a = self.list() # Test queue inside list
2563        a.append(self.Queue())
2564        a[0].put(123)
2565        self.assertEqual(a[0].get(), 123)
2566        b = self.dict() # Test queue inside dict
2567        b[0] = self.Queue()
2568        b[0].put(456)
2569        self.assertEqual(b[0].get(), 456)
2570
2571    def test_namespace(self):
2572        n = self.Namespace()
2573        n.name = 'Bob'
2574        n.job = 'Builder'
2575        n._hidden = 'hidden'
2576        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2577        del n.job
2578        self.assertEqual(str(n), "Namespace(name='Bob')")
2579        self.assertTrue(hasattr(n, 'name'))
2580        self.assertTrue(not hasattr(n, 'job'))
2581
2582#
2583#
2584#
2585
2586def sqr(x, wait=0.0, event=None):
2587    if event is None:
2588        time.sleep(wait)
2589    else:
2590        event.wait(wait)
2591    return x*x
2592
2593def mul(x, y):
2594    return x*y
2595
2596def raise_large_valuerror(wait):
2597    time.sleep(wait)
2598    raise ValueError("x" * 1024**2)
2599
2600def identity(x):
2601    return x
2602
2603class CountedObject(object):
2604    n_instances = 0
2605
2606    def __new__(cls):
2607        cls.n_instances += 1
2608        return object.__new__(cls)
2609
2610    def __del__(self):
2611        type(self).n_instances -= 1
2612
2613class SayWhenError(ValueError): pass
2614
2615def exception_throwing_generator(total, when):
2616    if when == -1:
2617        raise SayWhenError("Somebody said when")
2618    for i in range(total):
2619        if i == when:
2620            raise SayWhenError("Somebody said when")
2621        yield i
2622
2623
2624class _TestPool(BaseTestCase):
2625
2626    @classmethod
2627    def setUpClass(cls):
2628        super().setUpClass()
2629        cls.pool = cls.Pool(4)
2630
2631    @classmethod
2632    def tearDownClass(cls):
2633        cls.pool.terminate()
2634        cls.pool.join()
2635        cls.pool = None
2636        super().tearDownClass()
2637
2638    def test_apply(self):
2639        papply = self.pool.apply
2640        self.assertEqual(papply(sqr, (5,)), sqr(5))
2641        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2642
2643    def test_map(self):
2644        pmap = self.pool.map
2645        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2646        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2647                         list(map(sqr, list(range(100)))))
2648
2649    def test_starmap(self):
2650        psmap = self.pool.starmap
2651        tuples = list(zip(range(10), range(9,-1, -1)))
2652        self.assertEqual(psmap(mul, tuples),
2653                         list(itertools.starmap(mul, tuples)))
2654        tuples = list(zip(range(100), range(99,-1, -1)))
2655        self.assertEqual(psmap(mul, tuples, chunksize=20),
2656                         list(itertools.starmap(mul, tuples)))
2657
2658    def test_starmap_async(self):
2659        tuples = list(zip(range(100), range(99,-1, -1)))
2660        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2661                         list(itertools.starmap(mul, tuples)))
2662
2663    def test_map_async(self):
2664        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2665                         list(map(sqr, list(range(10)))))
2666
2667    def test_map_async_callbacks(self):
2668        call_args = self.manager.list() if self.TYPE == 'manager' else []
2669        self.pool.map_async(int, ['1'],
2670                            callback=call_args.append,
2671                            error_callback=call_args.append).wait()
2672        self.assertEqual(1, len(call_args))
2673        self.assertEqual([1], call_args[0])
2674        self.pool.map_async(int, ['a'],
2675                            callback=call_args.append,
2676                            error_callback=call_args.append).wait()
2677        self.assertEqual(2, len(call_args))
2678        self.assertIsInstance(call_args[1], ValueError)
2679
2680    def test_map_unplicklable(self):
2681        # Issue #19425 -- failure to pickle should not cause a hang
2682        if self.TYPE == 'threads':
2683            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2684        class A(object):
2685            def __reduce__(self):
2686                raise RuntimeError('cannot pickle')
2687        with self.assertRaises(RuntimeError):
2688            self.pool.map(sqr, [A()]*10)
2689
2690    def test_map_chunksize(self):
2691        try:
2692            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2693        except multiprocessing.TimeoutError:
2694            self.fail("pool.map_async with chunksize stalled on null list")
2695
2696    def test_map_handle_iterable_exception(self):
2697        if self.TYPE == 'manager':
2698            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2699
2700        # SayWhenError seen at the very first of the iterable
2701        with self.assertRaises(SayWhenError):
2702            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2703        # again, make sure it's reentrant
2704        with self.assertRaises(SayWhenError):
2705            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2706
2707        with self.assertRaises(SayWhenError):
2708            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2709
2710        class SpecialIterable:
2711            def __iter__(self):
2712                return self
2713            def __next__(self):
2714                raise SayWhenError
2715            def __len__(self):
2716                return 1
2717        with self.assertRaises(SayWhenError):
2718            self.pool.map(sqr, SpecialIterable(), 1)
2719        with self.assertRaises(SayWhenError):
2720            self.pool.map(sqr, SpecialIterable(), 1)
2721
2722    def test_async(self):
2723        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2724        get = TimingWrapper(res.get)
2725        self.assertEqual(get(), 49)
2726        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2727
2728    def test_async_timeout(self):
2729        p = self.Pool(3)
2730        try:
2731            event = threading.Event() if self.TYPE == 'threads' else None
2732            res = p.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT, event))
2733            get = TimingWrapper(res.get)
2734            self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2735            self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2736        finally:
2737            if event is not None:
2738                event.set()
2739            p.terminate()
2740            p.join()
2741
2742    def test_imap(self):
2743        it = self.pool.imap(sqr, list(range(10)))
2744        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2745
2746        it = self.pool.imap(sqr, list(range(10)))
2747        for i in range(10):
2748            self.assertEqual(next(it), i*i)
2749        self.assertRaises(StopIteration, it.__next__)
2750
2751        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2752        for i in range(1000):
2753            self.assertEqual(next(it), i*i)
2754        self.assertRaises(StopIteration, it.__next__)
2755
2756    def test_imap_handle_iterable_exception(self):
2757        if self.TYPE == 'manager':
2758            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2759
2760        # SayWhenError seen at the very first of the iterable
2761        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2762        self.assertRaises(SayWhenError, it.__next__)
2763        # again, make sure it's reentrant
2764        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2765        self.assertRaises(SayWhenError, it.__next__)
2766
2767        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2768        for i in range(3):
2769            self.assertEqual(next(it), i*i)
2770        self.assertRaises(SayWhenError, it.__next__)
2771
2772        # SayWhenError seen at start of problematic chunk's results
2773        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2774        for i in range(6):
2775            self.assertEqual(next(it), i*i)
2776        self.assertRaises(SayWhenError, it.__next__)
2777        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2778        for i in range(4):
2779            self.assertEqual(next(it), i*i)
2780        self.assertRaises(SayWhenError, it.__next__)
2781
2782    def test_imap_unordered(self):
2783        it = self.pool.imap_unordered(sqr, list(range(10)))
2784        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2785
2786        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2787        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2788
2789    def test_imap_unordered_handle_iterable_exception(self):
2790        if self.TYPE == 'manager':
2791            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2792
2793        # SayWhenError seen at the very first of the iterable
2794        it = self.pool.imap_unordered(sqr,
2795                                      exception_throwing_generator(1, -1),
2796                                      1)
2797        self.assertRaises(SayWhenError, it.__next__)
2798        # again, make sure it's reentrant
2799        it = self.pool.imap_unordered(sqr,
2800                                      exception_throwing_generator(1, -1),
2801                                      1)
2802        self.assertRaises(SayWhenError, it.__next__)
2803
2804        it = self.pool.imap_unordered(sqr,
2805                                      exception_throwing_generator(10, 3),
2806                                      1)
2807        expected_values = list(map(sqr, list(range(10))))
2808        with self.assertRaises(SayWhenError):
2809            # imap_unordered makes it difficult to anticipate the SayWhenError
2810            for i in range(10):
2811                value = next(it)
2812                self.assertIn(value, expected_values)
2813                expected_values.remove(value)
2814
2815        it = self.pool.imap_unordered(sqr,
2816                                      exception_throwing_generator(20, 7),
2817                                      2)
2818        expected_values = list(map(sqr, list(range(20))))
2819        with self.assertRaises(SayWhenError):
2820            for i in range(20):
2821                value = next(it)
2822                self.assertIn(value, expected_values)
2823                expected_values.remove(value)
2824
2825    def test_make_pool(self):
2826        expected_error = (RemoteError if self.TYPE == 'manager'
2827                          else ValueError)
2828
2829        self.assertRaises(expected_error, self.Pool, -1)
2830        self.assertRaises(expected_error, self.Pool, 0)
2831
2832        if self.TYPE != 'manager':
2833            p = self.Pool(3)
2834            try:
2835                self.assertEqual(3, len(p._pool))
2836            finally:
2837                p.close()
2838                p.join()
2839
2840    def test_terminate(self):
2841        # Simulate slow tasks which take "forever" to complete
2842        sleep_time = support.LONG_TIMEOUT
2843
2844        if self.TYPE == 'threads':
2845            # Thread pool workers can't be forced to quit, so if the first
2846            # task starts early enough, we will end up waiting for it.
2847            # Sleep for a shorter time, so the test doesn't block.
2848            sleep_time = 1
2849
2850        p = self.Pool(3)
2851        args = [sleep_time for i in range(10_000)]
2852        result = p.map_async(time.sleep, args, chunksize=1)
2853        time.sleep(0.2)  # give some tasks a chance to start
2854        p.terminate()
2855        p.join()
2856
2857    def test_empty_iterable(self):
2858        # See Issue 12157
2859        p = self.Pool(1)
2860
2861        self.assertEqual(p.map(sqr, []), [])
2862        self.assertEqual(list(p.imap(sqr, [])), [])
2863        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2864        self.assertEqual(p.map_async(sqr, []).get(), [])
2865
2866        p.close()
2867        p.join()
2868
2869    def test_context(self):
2870        if self.TYPE == 'processes':
2871            L = list(range(10))
2872            expected = [sqr(i) for i in L]
2873            with self.Pool(2) as p:
2874                r = p.map_async(sqr, L)
2875                self.assertEqual(r.get(), expected)
2876            p.join()
2877            self.assertRaises(ValueError, p.map_async, sqr, L)
2878
2879    @classmethod
2880    def _test_traceback(cls):
2881        raise RuntimeError(123) # some comment
2882
2883    def test_traceback(self):
2884        # We want ensure that the traceback from the child process is
2885        # contained in the traceback raised in the main process.
2886        if self.TYPE == 'processes':
2887            with self.Pool(1) as p:
2888                try:
2889                    p.apply(self._test_traceback)
2890                except Exception as e:
2891                    exc = e
2892                else:
2893                    self.fail('expected RuntimeError')
2894            p.join()
2895            self.assertIs(type(exc), RuntimeError)
2896            self.assertEqual(exc.args, (123,))
2897            cause = exc.__cause__
2898            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2899            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2900
2901            with test.support.captured_stderr() as f1:
2902                try:
2903                    raise exc
2904                except RuntimeError:
2905                    sys.excepthook(*sys.exc_info())
2906            self.assertIn('raise RuntimeError(123) # some comment',
2907                          f1.getvalue())
2908            # _helper_reraises_exception should not make the error
2909            # a remote exception
2910            with self.Pool(1) as p:
2911                try:
2912                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2913                except Exception as e:
2914                    exc = e
2915                else:
2916                    self.fail('expected SayWhenError')
2917                self.assertIs(type(exc), SayWhenError)
2918                self.assertIs(exc.__cause__, None)
2919            p.join()
2920
2921    @classmethod
2922    def _test_wrapped_exception(cls):
2923        raise RuntimeError('foo')
2924
2925    def test_wrapped_exception(self):
2926        # Issue #20980: Should not wrap exception when using thread pool
2927        with self.Pool(1) as p:
2928            with self.assertRaises(RuntimeError):
2929                p.apply(self._test_wrapped_exception)
2930        p.join()
2931
2932    def test_map_no_failfast(self):
2933        # Issue #23992: the fail-fast behaviour when an exception is raised
2934        # during map() would make Pool.join() deadlock, because a worker
2935        # process would fill the result queue (after the result handler thread
2936        # terminated, hence not draining it anymore).
2937
2938        t_start = time.monotonic()
2939
2940        with self.assertRaises(ValueError):
2941            with self.Pool(2) as p:
2942                try:
2943                    p.map(raise_large_valuerror, [0, 1])
2944                finally:
2945                    time.sleep(0.5)
2946                    p.close()
2947                    p.join()
2948
2949        # check that we indeed waited for all jobs
2950        self.assertGreater(time.monotonic() - t_start, 0.9)
2951
2952    def test_release_task_refs(self):
2953        # Issue #29861: task arguments and results should not be kept
2954        # alive after we are done with them.
2955        objs = [CountedObject() for i in range(10)]
2956        refs = [weakref.ref(o) for o in objs]
2957        self.pool.map(identity, objs)
2958
2959        del objs
2960        time.sleep(DELTA)  # let threaded cleanup code run
2961        support.gc_collect()  # For PyPy or other GCs.
2962        self.assertEqual(set(wr() for wr in refs), {None})
2963        # With a process pool, copies of the objects are returned, check
2964        # they were released too.
2965        self.assertEqual(CountedObject.n_instances, 0)
2966
2967    def test_enter(self):
2968        if self.TYPE == 'manager':
2969            self.skipTest("test not applicable to manager")
2970
2971        pool = self.Pool(1)
2972        with pool:
2973            pass
2974            # call pool.terminate()
2975        # pool is no longer running
2976
2977        with self.assertRaises(ValueError):
2978            # bpo-35477: pool.__enter__() fails if the pool is not running
2979            with pool:
2980                pass
2981        pool.join()
2982
2983    def test_resource_warning(self):
2984        if self.TYPE == 'manager':
2985            self.skipTest("test not applicable to manager")
2986
2987        pool = self.Pool(1)
2988        pool.terminate()
2989        pool.join()
2990
2991        # force state to RUN to emit ResourceWarning in __del__()
2992        pool._state = multiprocessing.pool.RUN
2993
2994        with warnings_helper.check_warnings(
2995                ('unclosed running multiprocessing pool', ResourceWarning)):
2996            pool = None
2997            support.gc_collect()
2998
2999def raising():
3000    raise KeyError("key")
3001
3002def unpickleable_result():
3003    return lambda: 42
3004
3005class _TestPoolWorkerErrors(BaseTestCase):
3006    ALLOWED_TYPES = ('processes', )
3007
3008    def test_async_error_callback(self):
3009        p = multiprocessing.Pool(2)
3010
3011        scratchpad = [None]
3012        def errback(exc):
3013            scratchpad[0] = exc
3014
3015        res = p.apply_async(raising, error_callback=errback)
3016        self.assertRaises(KeyError, res.get)
3017        self.assertTrue(scratchpad[0])
3018        self.assertIsInstance(scratchpad[0], KeyError)
3019
3020        p.close()
3021        p.join()
3022
3023    def test_unpickleable_result(self):
3024        from multiprocessing.pool import MaybeEncodingError
3025        p = multiprocessing.Pool(2)
3026
3027        # Make sure we don't lose pool processes because of encoding errors.
3028        for iteration in range(20):
3029
3030            scratchpad = [None]
3031            def errback(exc):
3032                scratchpad[0] = exc
3033
3034            res = p.apply_async(unpickleable_result, error_callback=errback)
3035            self.assertRaises(MaybeEncodingError, res.get)
3036            wrapped = scratchpad[0]
3037            self.assertTrue(wrapped)
3038            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
3039            self.assertIsNotNone(wrapped.exc)
3040            self.assertIsNotNone(wrapped.value)
3041
3042        p.close()
3043        p.join()
3044
3045class _TestPoolWorkerLifetime(BaseTestCase):
3046    ALLOWED_TYPES = ('processes', )
3047
3048    def test_pool_worker_lifetime(self):
3049        p = multiprocessing.Pool(3, maxtasksperchild=10)
3050        self.assertEqual(3, len(p._pool))
3051        origworkerpids = [w.pid for w in p._pool]
3052        # Run many tasks so each worker gets replaced (hopefully)
3053        results = []
3054        for i in range(100):
3055            results.append(p.apply_async(sqr, (i, )))
3056        # Fetch the results and verify we got the right answers,
3057        # also ensuring all the tasks have completed.
3058        for (j, res) in enumerate(results):
3059            self.assertEqual(res.get(), sqr(j))
3060        # Refill the pool
3061        p._repopulate_pool()
3062        # Wait until all workers are alive
3063        # (countdown * DELTA = 5 seconds max startup process time)
3064        countdown = 50
3065        while countdown and not all(w.is_alive() for w in p._pool):
3066            countdown -= 1
3067            time.sleep(DELTA)
3068        finalworkerpids = [w.pid for w in p._pool]
3069        # All pids should be assigned.  See issue #7805.
3070        self.assertNotIn(None, origworkerpids)
3071        self.assertNotIn(None, finalworkerpids)
3072        # Finally, check that the worker pids have changed
3073        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
3074        p.close()
3075        p.join()
3076
3077    def test_pool_worker_lifetime_early_close(self):
3078        # Issue #10332: closing a pool whose workers have limited lifetimes
3079        # before all the tasks completed would make join() hang.
3080        p = multiprocessing.Pool(3, maxtasksperchild=1)
3081        results = []
3082        for i in range(6):
3083            results.append(p.apply_async(sqr, (i, 0.3)))
3084        p.close()
3085        p.join()
3086        # check the results
3087        for (j, res) in enumerate(results):
3088            self.assertEqual(res.get(), sqr(j))
3089
3090    def test_pool_maxtasksperchild_invalid(self):
3091        for value in [0, -1, 0.5, "12"]:
3092            with self.assertRaises(ValueError):
3093                multiprocessing.Pool(3, maxtasksperchild=value)
3094
3095    def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
3096        # tests cases against bpo-38744 and bpo-39360
3097        cmd = '''if 1:
3098            from multiprocessing import Pool
3099            problem = None
3100            class A:
3101                def __init__(self):
3102                    self.pool = Pool(processes=1)
3103            def test():
3104                global problem
3105                problem = A()
3106                problem.pool.map(float, tuple(range(10)))
3107            if __name__ == "__main__":
3108                test()
3109        '''
3110        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
3111        self.assertEqual(rc, 0)
3112
3113#
3114# Test of creating a customized manager class
3115#
3116
3117from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
3118
3119class FooBar(object):
3120    def f(self):
3121        return 'f()'
3122    def g(self):
3123        raise ValueError
3124    def _h(self):
3125        return '_h()'
3126
3127def baz():
3128    for i in range(10):
3129        yield i*i
3130
3131class IteratorProxy(BaseProxy):
3132    _exposed_ = ('__next__',)
3133    def __iter__(self):
3134        return self
3135    def __next__(self):
3136        return self._callmethod('__next__')
3137
3138class MyManager(BaseManager):
3139    pass
3140
3141MyManager.register('Foo', callable=FooBar)
3142MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
3143MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
3144
3145
3146class _TestMyManager(BaseTestCase):
3147
3148    ALLOWED_TYPES = ('manager',)
3149
3150    def test_mymanager(self):
3151        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
3152        manager.start()
3153        self.common(manager)
3154        manager.shutdown()
3155
3156        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
3157        # to the manager process if it takes longer than 1 second to stop,
3158        # which happens on slow buildbots.
3159        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
3160
3161    def test_mymanager_context(self):
3162        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
3163        with manager:
3164            self.common(manager)
3165        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
3166        # to the manager process if it takes longer than 1 second to stop,
3167        # which happens on slow buildbots.
3168        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
3169
3170    def test_mymanager_context_prestarted(self):
3171        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
3172        manager.start()
3173        with manager:
3174            self.common(manager)
3175        self.assertEqual(manager._process.exitcode, 0)
3176
3177    def common(self, manager):
3178        foo = manager.Foo()
3179        bar = manager.Bar()
3180        baz = manager.baz()
3181
3182        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
3183        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
3184
3185        self.assertEqual(foo_methods, ['f', 'g'])
3186        self.assertEqual(bar_methods, ['f', '_h'])
3187
3188        self.assertEqual(foo.f(), 'f()')
3189        self.assertRaises(ValueError, foo.g)
3190        self.assertEqual(foo._callmethod('f'), 'f()')
3191        self.assertRaises(RemoteError, foo._callmethod, '_h')
3192
3193        self.assertEqual(bar.f(), 'f()')
3194        self.assertEqual(bar._h(), '_h()')
3195        self.assertEqual(bar._callmethod('f'), 'f()')
3196        self.assertEqual(bar._callmethod('_h'), '_h()')
3197
3198        self.assertEqual(list(baz), [i*i for i in range(10)])
3199
3200
3201#
3202# Test of connecting to a remote server and using xmlrpclib for serialization
3203#
3204
3205_queue = pyqueue.Queue()
3206def get_queue():
3207    return _queue
3208
3209class QueueManager(BaseManager):
3210    '''manager class used by server process'''
3211QueueManager.register('get_queue', callable=get_queue)
3212
3213class QueueManager2(BaseManager):
3214    '''manager class which specifies the same interface as QueueManager'''
3215QueueManager2.register('get_queue')
3216
3217
3218SERIALIZER = 'xmlrpclib'
3219
3220class _TestRemoteManager(BaseTestCase):
3221
3222    ALLOWED_TYPES = ('manager',)
3223    values = ['hello world', None, True, 2.25,
3224              'hall\xe5 v\xe4rlden',
3225              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
3226              b'hall\xe5 v\xe4rlden',
3227             ]
3228    result = values[:]
3229
3230    @classmethod
3231    def _putter(cls, address, authkey):
3232        manager = QueueManager2(
3233            address=address, authkey=authkey, serializer=SERIALIZER,
3234            shutdown_timeout=SHUTDOWN_TIMEOUT)
3235        manager.connect()
3236        queue = manager.get_queue()
3237        # Note that xmlrpclib will deserialize object as a list not a tuple
3238        queue.put(tuple(cls.values))
3239
3240    def test_remote(self):
3241        authkey = os.urandom(32)
3242
3243        manager = QueueManager(
3244            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
3245            shutdown_timeout=SHUTDOWN_TIMEOUT)
3246        manager.start()
3247        self.addCleanup(manager.shutdown)
3248
3249        p = self.Process(target=self._putter, args=(manager.address, authkey))
3250        p.daemon = True
3251        p.start()
3252
3253        manager2 = QueueManager2(
3254            address=manager.address, authkey=authkey, serializer=SERIALIZER,
3255            shutdown_timeout=SHUTDOWN_TIMEOUT)
3256        manager2.connect()
3257        queue = manager2.get_queue()
3258
3259        self.assertEqual(queue.get(), self.result)
3260
3261        # Because we are using xmlrpclib for serialization instead of
3262        # pickle this will cause a serialization error.
3263        self.assertRaises(Exception, queue.put, time.sleep)
3264
3265        # Make queue finalizer run before the server is stopped
3266        del queue
3267
3268
3269@hashlib_helper.requires_hashdigest('sha256')
3270class _TestManagerRestart(BaseTestCase):
3271
3272    @classmethod
3273    def _putter(cls, address, authkey):
3274        manager = QueueManager(
3275            address=address, authkey=authkey, serializer=SERIALIZER,
3276            shutdown_timeout=SHUTDOWN_TIMEOUT)
3277        manager.connect()
3278        queue = manager.get_queue()
3279        queue.put('hello world')
3280
3281    def test_rapid_restart(self):
3282        authkey = os.urandom(32)
3283        manager = QueueManager(
3284            address=(socket_helper.HOST, 0), authkey=authkey,
3285            serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
3286        try:
3287            srvr = manager.get_server()
3288            addr = srvr.address
3289            # Close the connection.Listener socket which gets opened as a part
3290            # of manager.get_server(). It's not needed for the test.
3291            srvr.listener.close()
3292            manager.start()
3293
3294            p = self.Process(target=self._putter, args=(manager.address, authkey))
3295            p.start()
3296            p.join()
3297            queue = manager.get_queue()
3298            self.assertEqual(queue.get(), 'hello world')
3299            del queue
3300        finally:
3301            if hasattr(manager, "shutdown"):
3302                manager.shutdown()
3303
3304        manager = QueueManager(
3305            address=addr, authkey=authkey, serializer=SERIALIZER,
3306            shutdown_timeout=SHUTDOWN_TIMEOUT)
3307        try:
3308            manager.start()
3309            self.addCleanup(manager.shutdown)
3310        except OSError as e:
3311            if e.errno != errno.EADDRINUSE:
3312                raise
3313            # Retry after some time, in case the old socket was lingering
3314            # (sporadic failure on buildbots)
3315            time.sleep(1.0)
3316            manager = QueueManager(
3317                address=addr, authkey=authkey, serializer=SERIALIZER,
3318                shutdown_timeout=SHUTDOWN_TIMEOUT)
3319            if hasattr(manager, "shutdown"):
3320                self.addCleanup(manager.shutdown)
3321
3322
3323class FakeConnection:
3324    def send(self, payload):
3325        pass
3326
3327    def recv(self):
3328        return '#ERROR', pyqueue.Empty()
3329
3330class TestManagerExceptions(unittest.TestCase):
3331    # Issue 106558: Manager exceptions avoids creating cyclic references.
3332    def setUp(self):
3333        self.mgr = multiprocessing.Manager()
3334
3335    def tearDown(self):
3336        self.mgr.shutdown()
3337        self.mgr.join()
3338
3339    def test_queue_get(self):
3340        queue = self.mgr.Queue()
3341        if gc.isenabled():
3342            gc.disable()
3343            self.addCleanup(gc.enable)
3344        try:
3345            queue.get_nowait()
3346        except pyqueue.Empty as e:
3347            wr = weakref.ref(e)
3348        self.assertEqual(wr(), None)
3349
3350    def test_dispatch(self):
3351        if gc.isenabled():
3352            gc.disable()
3353            self.addCleanup(gc.enable)
3354        try:
3355            multiprocessing.managers.dispatch(FakeConnection(), None, None)
3356        except pyqueue.Empty as e:
3357            wr = weakref.ref(e)
3358        self.assertEqual(wr(), None)
3359
3360#
3361#
3362#
3363
3364SENTINEL = latin('')
3365
3366class _TestConnection(BaseTestCase):
3367
3368    ALLOWED_TYPES = ('processes', 'threads')
3369
3370    @classmethod
3371    def _echo(cls, conn):
3372        for msg in iter(conn.recv_bytes, SENTINEL):
3373            conn.send_bytes(msg)
3374        conn.close()
3375
3376    def test_connection(self):
3377        conn, child_conn = self.Pipe()
3378
3379        p = self.Process(target=self._echo, args=(child_conn,))
3380        p.daemon = True
3381        p.start()
3382
3383        seq = [1, 2.25, None]
3384        msg = latin('hello world')
3385        longmsg = msg * 10
3386        arr = array.array('i', list(range(4)))
3387
3388        if self.TYPE == 'processes':
3389            self.assertEqual(type(conn.fileno()), int)
3390
3391        self.assertEqual(conn.send(seq), None)
3392        self.assertEqual(conn.recv(), seq)
3393
3394        self.assertEqual(conn.send_bytes(msg), None)
3395        self.assertEqual(conn.recv_bytes(), msg)
3396
3397        if self.TYPE == 'processes':
3398            buffer = array.array('i', [0]*10)
3399            expected = list(arr) + [0] * (10 - len(arr))
3400            self.assertEqual(conn.send_bytes(arr), None)
3401            self.assertEqual(conn.recv_bytes_into(buffer),
3402                             len(arr) * buffer.itemsize)
3403            self.assertEqual(list(buffer), expected)
3404
3405            buffer = array.array('i', [0]*10)
3406            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3407            self.assertEqual(conn.send_bytes(arr), None)
3408            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3409                             len(arr) * buffer.itemsize)
3410            self.assertEqual(list(buffer), expected)
3411
3412            buffer = bytearray(latin(' ' * 40))
3413            self.assertEqual(conn.send_bytes(longmsg), None)
3414            try:
3415                res = conn.recv_bytes_into(buffer)
3416            except multiprocessing.BufferTooShort as e:
3417                self.assertEqual(e.args, (longmsg,))
3418            else:
3419                self.fail('expected BufferTooShort, got %s' % res)
3420
3421        poll = TimingWrapper(conn.poll)
3422
3423        self.assertEqual(poll(), False)
3424        self.assertTimingAlmostEqual(poll.elapsed, 0)
3425
3426        self.assertEqual(poll(-1), False)
3427        self.assertTimingAlmostEqual(poll.elapsed, 0)
3428
3429        self.assertEqual(poll(TIMEOUT1), False)
3430        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3431
3432        conn.send(None)
3433        time.sleep(.1)
3434
3435        self.assertEqual(poll(TIMEOUT1), True)
3436        self.assertTimingAlmostEqual(poll.elapsed, 0)
3437
3438        self.assertEqual(conn.recv(), None)
3439
3440        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
3441        conn.send_bytes(really_big_msg)
3442        self.assertEqual(conn.recv_bytes(), really_big_msg)
3443
3444        conn.send_bytes(SENTINEL)                          # tell child to quit
3445        child_conn.close()
3446
3447        if self.TYPE == 'processes':
3448            self.assertEqual(conn.readable, True)
3449            self.assertEqual(conn.writable, True)
3450            self.assertRaises(EOFError, conn.recv)
3451            self.assertRaises(EOFError, conn.recv_bytes)
3452
3453        p.join()
3454
3455    def test_duplex_false(self):
3456        reader, writer = self.Pipe(duplex=False)
3457        self.assertEqual(writer.send(1), None)
3458        self.assertEqual(reader.recv(), 1)
3459        if self.TYPE == 'processes':
3460            self.assertEqual(reader.readable, True)
3461            self.assertEqual(reader.writable, False)
3462            self.assertEqual(writer.readable, False)
3463            self.assertEqual(writer.writable, True)
3464            self.assertRaises(OSError, reader.send, 2)
3465            self.assertRaises(OSError, writer.recv)
3466            self.assertRaises(OSError, writer.poll)
3467
3468    def test_spawn_close(self):
3469        # We test that a pipe connection can be closed by parent
3470        # process immediately after child is spawned.  On Windows this
3471        # would have sometimes failed on old versions because
3472        # child_conn would be closed before the child got a chance to
3473        # duplicate it.
3474        conn, child_conn = self.Pipe()
3475
3476        p = self.Process(target=self._echo, args=(child_conn,))
3477        p.daemon = True
3478        p.start()
3479        child_conn.close()    # this might complete before child initializes
3480
3481        msg = latin('hello')
3482        conn.send_bytes(msg)
3483        self.assertEqual(conn.recv_bytes(), msg)
3484
3485        conn.send_bytes(SENTINEL)
3486        conn.close()
3487        p.join()
3488
3489    def test_sendbytes(self):
3490        if self.TYPE != 'processes':
3491            self.skipTest('test not appropriate for {}'.format(self.TYPE))
3492
3493        msg = latin('abcdefghijklmnopqrstuvwxyz')
3494        a, b = self.Pipe()
3495
3496        a.send_bytes(msg)
3497        self.assertEqual(b.recv_bytes(), msg)
3498
3499        a.send_bytes(msg, 5)
3500        self.assertEqual(b.recv_bytes(), msg[5:])
3501
3502        a.send_bytes(msg, 7, 8)
3503        self.assertEqual(b.recv_bytes(), msg[7:7+8])
3504
3505        a.send_bytes(msg, 26)
3506        self.assertEqual(b.recv_bytes(), latin(''))
3507
3508        a.send_bytes(msg, 26, 0)
3509        self.assertEqual(b.recv_bytes(), latin(''))
3510
3511        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3512
3513        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3514
3515        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3516
3517        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3518
3519        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3520
3521    @classmethod
3522    def _is_fd_assigned(cls, fd):
3523        try:
3524            os.fstat(fd)
3525        except OSError as e:
3526            if e.errno == errno.EBADF:
3527                return False
3528            raise
3529        else:
3530            return True
3531
3532    @classmethod
3533    def _writefd(cls, conn, data, create_dummy_fds=False):
3534        if create_dummy_fds:
3535            for i in range(0, 256):
3536                if not cls._is_fd_assigned(i):
3537                    os.dup2(conn.fileno(), i)
3538        fd = reduction.recv_handle(conn)
3539        if msvcrt:
3540            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3541        os.write(fd, data)
3542        os.close(fd)
3543
3544    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3545    def test_fd_transfer(self):
3546        if self.TYPE != 'processes':
3547            self.skipTest("only makes sense with processes")
3548        conn, child_conn = self.Pipe(duplex=True)
3549
3550        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3551        p.daemon = True
3552        p.start()
3553        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3554        with open(os_helper.TESTFN, "wb") as f:
3555            fd = f.fileno()
3556            if msvcrt:
3557                fd = msvcrt.get_osfhandle(fd)
3558            reduction.send_handle(conn, fd, p.pid)
3559        p.join()
3560        with open(os_helper.TESTFN, "rb") as f:
3561            self.assertEqual(f.read(), b"foo")
3562
3563    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3564    @unittest.skipIf(sys.platform == "win32",
3565                     "test semantics don't make sense on Windows")
3566    @unittest.skipIf(MAXFD <= 256,
3567                     "largest assignable fd number is too small")
3568    @unittest.skipUnless(hasattr(os, "dup2"),
3569                         "test needs os.dup2()")
3570    def test_large_fd_transfer(self):
3571        # With fd > 256 (issue #11657)
3572        if self.TYPE != 'processes':
3573            self.skipTest("only makes sense with processes")
3574        conn, child_conn = self.Pipe(duplex=True)
3575
3576        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3577        p.daemon = True
3578        p.start()
3579        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3580        with open(os_helper.TESTFN, "wb") as f:
3581            fd = f.fileno()
3582            for newfd in range(256, MAXFD):
3583                if not self._is_fd_assigned(newfd):
3584                    break
3585            else:
3586                self.fail("could not find an unassigned large file descriptor")
3587            os.dup2(fd, newfd)
3588            try:
3589                reduction.send_handle(conn, newfd, p.pid)
3590            finally:
3591                os.close(newfd)
3592        p.join()
3593        with open(os_helper.TESTFN, "rb") as f:
3594            self.assertEqual(f.read(), b"bar")
3595
3596    @classmethod
3597    def _send_data_without_fd(self, conn):
3598        os.write(conn.fileno(), b"\0")
3599
3600    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3601    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3602    def test_missing_fd_transfer(self):
3603        # Check that exception is raised when received data is not
3604        # accompanied by a file descriptor in ancillary data.
3605        if self.TYPE != 'processes':
3606            self.skipTest("only makes sense with processes")
3607        conn, child_conn = self.Pipe(duplex=True)
3608
3609        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3610        p.daemon = True
3611        p.start()
3612        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3613        p.join()
3614
3615    def test_context(self):
3616        a, b = self.Pipe()
3617
3618        with a, b:
3619            a.send(1729)
3620            self.assertEqual(b.recv(), 1729)
3621            if self.TYPE == 'processes':
3622                self.assertFalse(a.closed)
3623                self.assertFalse(b.closed)
3624
3625        if self.TYPE == 'processes':
3626            self.assertTrue(a.closed)
3627            self.assertTrue(b.closed)
3628            self.assertRaises(OSError, a.recv)
3629            self.assertRaises(OSError, b.recv)
3630
3631class _TestListener(BaseTestCase):
3632
3633    ALLOWED_TYPES = ('processes',)
3634
3635    def test_multiple_bind(self):
3636        for family in self.connection.families:
3637            l = self.connection.Listener(family=family)
3638            self.addCleanup(l.close)
3639            self.assertRaises(OSError, self.connection.Listener,
3640                              l.address, family)
3641
3642    def test_context(self):
3643        with self.connection.Listener() as l:
3644            with self.connection.Client(l.address) as c:
3645                with l.accept() as d:
3646                    c.send(1729)
3647                    self.assertEqual(d.recv(), 1729)
3648
3649        if self.TYPE == 'processes':
3650            self.assertRaises(OSError, l.accept)
3651
3652    def test_empty_authkey(self):
3653        # bpo-43952: allow empty bytes as authkey
3654        def handler(*args):
3655            raise RuntimeError('Connection took too long...')
3656
3657        def run(addr, authkey):
3658            client = self.connection.Client(addr, authkey=authkey)
3659            client.send(1729)
3660
3661        key = b''
3662
3663        with self.connection.Listener(authkey=key) as listener:
3664            thread = threading.Thread(target=run, args=(listener.address, key))
3665            thread.start()
3666            try:
3667                with listener.accept() as d:
3668                    self.assertEqual(d.recv(), 1729)
3669            finally:
3670                thread.join()
3671
3672        if self.TYPE == 'processes':
3673            with self.assertRaises(OSError):
3674                listener.accept()
3675
3676    @unittest.skipUnless(util.abstract_sockets_supported,
3677                         "test needs abstract socket support")
3678    def test_abstract_socket(self):
3679        with self.connection.Listener("\0something") as listener:
3680            with self.connection.Client(listener.address) as client:
3681                with listener.accept() as d:
3682                    client.send(1729)
3683                    self.assertEqual(d.recv(), 1729)
3684
3685        if self.TYPE == 'processes':
3686            self.assertRaises(OSError, listener.accept)
3687
3688
3689class _TestListenerClient(BaseTestCase):
3690
3691    ALLOWED_TYPES = ('processes', 'threads')
3692
3693    @classmethod
3694    def _test(cls, address):
3695        conn = cls.connection.Client(address)
3696        conn.send('hello')
3697        conn.close()
3698
3699    def test_listener_client(self):
3700        for family in self.connection.families:
3701            l = self.connection.Listener(family=family)
3702            p = self.Process(target=self._test, args=(l.address,))
3703            p.daemon = True
3704            p.start()
3705            conn = l.accept()
3706            self.assertEqual(conn.recv(), 'hello')
3707            p.join()
3708            l.close()
3709
3710    def test_issue14725(self):
3711        l = self.connection.Listener()
3712        p = self.Process(target=self._test, args=(l.address,))
3713        p.daemon = True
3714        p.start()
3715        time.sleep(1)
3716        # On Windows the client process should by now have connected,
3717        # written data and closed the pipe handle by now.  This causes
3718        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3719        # 14725.
3720        conn = l.accept()
3721        self.assertEqual(conn.recv(), 'hello')
3722        conn.close()
3723        p.join()
3724        l.close()
3725
3726    def test_issue16955(self):
3727        for fam in self.connection.families:
3728            l = self.connection.Listener(family=fam)
3729            c = self.connection.Client(l.address)
3730            a = l.accept()
3731            a.send_bytes(b"hello")
3732            self.assertTrue(c.poll(1))
3733            a.close()
3734            c.close()
3735            l.close()
3736
3737class _TestPoll(BaseTestCase):
3738
3739    ALLOWED_TYPES = ('processes', 'threads')
3740
3741    def test_empty_string(self):
3742        a, b = self.Pipe()
3743        self.assertEqual(a.poll(), False)
3744        b.send_bytes(b'')
3745        self.assertEqual(a.poll(), True)
3746        self.assertEqual(a.poll(), True)
3747
3748    @classmethod
3749    def _child_strings(cls, conn, strings):
3750        for s in strings:
3751            time.sleep(0.1)
3752            conn.send_bytes(s)
3753        conn.close()
3754
3755    def test_strings(self):
3756        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3757        a, b = self.Pipe()
3758        p = self.Process(target=self._child_strings, args=(b, strings))
3759        p.start()
3760
3761        for s in strings:
3762            for i in range(200):
3763                if a.poll(0.01):
3764                    break
3765            x = a.recv_bytes()
3766            self.assertEqual(s, x)
3767
3768        p.join()
3769
3770    @classmethod
3771    def _child_boundaries(cls, r):
3772        # Polling may "pull" a message in to the child process, but we
3773        # don't want it to pull only part of a message, as that would
3774        # corrupt the pipe for any other processes which might later
3775        # read from it.
3776        r.poll(5)
3777
3778    def test_boundaries(self):
3779        r, w = self.Pipe(False)
3780        p = self.Process(target=self._child_boundaries, args=(r,))
3781        p.start()
3782        time.sleep(2)
3783        L = [b"first", b"second"]
3784        for obj in L:
3785            w.send_bytes(obj)
3786        w.close()
3787        p.join()
3788        self.assertIn(r.recv_bytes(), L)
3789
3790    @classmethod
3791    def _child_dont_merge(cls, b):
3792        b.send_bytes(b'a')
3793        b.send_bytes(b'b')
3794        b.send_bytes(b'cd')
3795
3796    def test_dont_merge(self):
3797        a, b = self.Pipe()
3798        self.assertEqual(a.poll(0.0), False)
3799        self.assertEqual(a.poll(0.1), False)
3800
3801        p = self.Process(target=self._child_dont_merge, args=(b,))
3802        p.start()
3803
3804        self.assertEqual(a.recv_bytes(), b'a')
3805        self.assertEqual(a.poll(1.0), True)
3806        self.assertEqual(a.poll(1.0), True)
3807        self.assertEqual(a.recv_bytes(), b'b')
3808        self.assertEqual(a.poll(1.0), True)
3809        self.assertEqual(a.poll(1.0), True)
3810        self.assertEqual(a.poll(0.0), True)
3811        self.assertEqual(a.recv_bytes(), b'cd')
3812
3813        p.join()
3814
3815#
3816# Test of sending connection and socket objects between processes
3817#
3818
3819@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3820@hashlib_helper.requires_hashdigest('sha256')
3821class _TestPicklingConnections(BaseTestCase):
3822
3823    ALLOWED_TYPES = ('processes',)
3824
3825    @classmethod
3826    def tearDownClass(cls):
3827        from multiprocessing import resource_sharer
3828        resource_sharer.stop(timeout=support.LONG_TIMEOUT)
3829
3830    @classmethod
3831    def _listener(cls, conn, families):
3832        for fam in families:
3833            l = cls.connection.Listener(family=fam)
3834            conn.send(l.address)
3835            new_conn = l.accept()
3836            conn.send(new_conn)
3837            new_conn.close()
3838            l.close()
3839
3840        l = socket.create_server((socket_helper.HOST, 0))
3841        conn.send(l.getsockname())
3842        new_conn, addr = l.accept()
3843        conn.send(new_conn)
3844        new_conn.close()
3845        l.close()
3846
3847        conn.recv()
3848
3849    @classmethod
3850    def _remote(cls, conn):
3851        for (address, msg) in iter(conn.recv, None):
3852            client = cls.connection.Client(address)
3853            client.send(msg.upper())
3854            client.close()
3855
3856        address, msg = conn.recv()
3857        client = socket.socket()
3858        client.connect(address)
3859        client.sendall(msg.upper())
3860        client.close()
3861
3862        conn.close()
3863
3864    def test_pickling(self):
3865        families = self.connection.families
3866
3867        lconn, lconn0 = self.Pipe()
3868        lp = self.Process(target=self._listener, args=(lconn0, families))
3869        lp.daemon = True
3870        lp.start()
3871        lconn0.close()
3872
3873        rconn, rconn0 = self.Pipe()
3874        rp = self.Process(target=self._remote, args=(rconn0,))
3875        rp.daemon = True
3876        rp.start()
3877        rconn0.close()
3878
3879        for fam in families:
3880            msg = ('This connection uses family %s' % fam).encode('ascii')
3881            address = lconn.recv()
3882            rconn.send((address, msg))
3883            new_conn = lconn.recv()
3884            self.assertEqual(new_conn.recv(), msg.upper())
3885
3886        rconn.send(None)
3887
3888        msg = latin('This connection uses a normal socket')
3889        address = lconn.recv()
3890        rconn.send((address, msg))
3891        new_conn = lconn.recv()
3892        buf = []
3893        while True:
3894            s = new_conn.recv(100)
3895            if not s:
3896                break
3897            buf.append(s)
3898        buf = b''.join(buf)
3899        self.assertEqual(buf, msg.upper())
3900        new_conn.close()
3901
3902        lconn.send(None)
3903
3904        rconn.close()
3905        lconn.close()
3906
3907        lp.join()
3908        rp.join()
3909
3910    @classmethod
3911    def child_access(cls, conn):
3912        w = conn.recv()
3913        w.send('all is well')
3914        w.close()
3915
3916        r = conn.recv()
3917        msg = r.recv()
3918        conn.send(msg*2)
3919
3920        conn.close()
3921
3922    def test_access(self):
3923        # On Windows, if we do not specify a destination pid when
3924        # using DupHandle then we need to be careful to use the
3925        # correct access flags for DuplicateHandle(), or else
3926        # DupHandle.detach() will raise PermissionError.  For example,
3927        # for a read only pipe handle we should use
3928        # access=FILE_GENERIC_READ.  (Unfortunately
3929        # DUPLICATE_SAME_ACCESS does not work.)
3930        conn, child_conn = self.Pipe()
3931        p = self.Process(target=self.child_access, args=(child_conn,))
3932        p.daemon = True
3933        p.start()
3934        child_conn.close()
3935
3936        r, w = self.Pipe(duplex=False)
3937        conn.send(w)
3938        w.close()
3939        self.assertEqual(r.recv(), 'all is well')
3940        r.close()
3941
3942        r, w = self.Pipe(duplex=False)
3943        conn.send(r)
3944        r.close()
3945        w.send('foobar')
3946        w.close()
3947        self.assertEqual(conn.recv(), 'foobar'*2)
3948
3949        p.join()
3950
3951#
3952#
3953#
3954
3955class _TestHeap(BaseTestCase):
3956
3957    ALLOWED_TYPES = ('processes',)
3958
3959    def setUp(self):
3960        super().setUp()
3961        # Make pristine heap for these tests
3962        self.old_heap = multiprocessing.heap.BufferWrapper._heap
3963        multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3964
3965    def tearDown(self):
3966        multiprocessing.heap.BufferWrapper._heap = self.old_heap
3967        super().tearDown()
3968
3969    def test_heap(self):
3970        iterations = 5000
3971        maxblocks = 50
3972        blocks = []
3973
3974        # get the heap object
3975        heap = multiprocessing.heap.BufferWrapper._heap
3976        heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3977
3978        # create and destroy lots of blocks of different sizes
3979        for i in range(iterations):
3980            size = int(random.lognormvariate(0, 1) * 1000)
3981            b = multiprocessing.heap.BufferWrapper(size)
3982            blocks.append(b)
3983            if len(blocks) > maxblocks:
3984                i = random.randrange(maxblocks)
3985                del blocks[i]
3986            del b
3987
3988        # verify the state of the heap
3989        with heap._lock:
3990            all = []
3991            free = 0
3992            occupied = 0
3993            for L in list(heap._len_to_seq.values()):
3994                # count all free blocks in arenas
3995                for arena, start, stop in L:
3996                    all.append((heap._arenas.index(arena), start, stop,
3997                                stop-start, 'free'))
3998                    free += (stop-start)
3999            for arena, arena_blocks in heap._allocated_blocks.items():
4000                # count all allocated blocks in arenas
4001                for start, stop in arena_blocks:
4002                    all.append((heap._arenas.index(arena), start, stop,
4003                                stop-start, 'occupied'))
4004                    occupied += (stop-start)
4005
4006            self.assertEqual(free + occupied,
4007                             sum(arena.size for arena in heap._arenas))
4008
4009            all.sort()
4010
4011            for i in range(len(all)-1):
4012                (arena, start, stop) = all[i][:3]
4013                (narena, nstart, nstop) = all[i+1][:3]
4014                if arena != narena:
4015                    # Two different arenas
4016                    self.assertEqual(stop, heap._arenas[arena].size)  # last block
4017                    self.assertEqual(nstart, 0)         # first block
4018                else:
4019                    # Same arena: two adjacent blocks
4020                    self.assertEqual(stop, nstart)
4021
4022        # test free'ing all blocks
4023        random.shuffle(blocks)
4024        while blocks:
4025            blocks.pop()
4026
4027        self.assertEqual(heap._n_frees, heap._n_mallocs)
4028        self.assertEqual(len(heap._pending_free_blocks), 0)
4029        self.assertEqual(len(heap._arenas), 0)
4030        self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
4031        self.assertEqual(len(heap._len_to_seq), 0)
4032
4033    def test_free_from_gc(self):
4034        # Check that freeing of blocks by the garbage collector doesn't deadlock
4035        # (issue #12352).
4036        # Make sure the GC is enabled, and set lower collection thresholds to
4037        # make collections more frequent (and increase the probability of
4038        # deadlock).
4039        if not gc.isenabled():
4040            gc.enable()
4041            self.addCleanup(gc.disable)
4042        thresholds = gc.get_threshold()
4043        self.addCleanup(gc.set_threshold, *thresholds)
4044        gc.set_threshold(10)
4045
4046        # perform numerous block allocations, with cyclic references to make
4047        # sure objects are collected asynchronously by the gc
4048        for i in range(5000):
4049            a = multiprocessing.heap.BufferWrapper(1)
4050            b = multiprocessing.heap.BufferWrapper(1)
4051            # circular references
4052            a.buddy = b
4053            b.buddy = a
4054
4055#
4056#
4057#
4058
4059class _Foo(Structure):
4060    _fields_ = [
4061        ('x', c_int),
4062        ('y', c_double),
4063        ('z', c_longlong,)
4064        ]
4065
4066class _TestSharedCTypes(BaseTestCase):
4067
4068    ALLOWED_TYPES = ('processes',)
4069
4070    def setUp(self):
4071        if not HAS_SHAREDCTYPES:
4072            self.skipTest("requires multiprocessing.sharedctypes")
4073
4074    @classmethod
4075    def _double(cls, x, y, z, foo, arr, string):
4076        x.value *= 2
4077        y.value *= 2
4078        z.value *= 2
4079        foo.x *= 2
4080        foo.y *= 2
4081        string.value *= 2
4082        for i in range(len(arr)):
4083            arr[i] *= 2
4084
4085    def test_sharedctypes(self, lock=False):
4086        x = Value('i', 7, lock=lock)
4087        y = Value(c_double, 1.0/3.0, lock=lock)
4088        z = Value(c_longlong, 2 ** 33, lock=lock)
4089        foo = Value(_Foo, 3, 2, lock=lock)
4090        arr = self.Array('d', list(range(10)), lock=lock)
4091        string = self.Array('c', 20, lock=lock)
4092        string.value = latin('hello')
4093
4094        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
4095        p.daemon = True
4096        p.start()
4097        p.join()
4098
4099        self.assertEqual(x.value, 14)
4100        self.assertAlmostEqual(y.value, 2.0/3.0)
4101        self.assertEqual(z.value, 2 ** 34)
4102        self.assertEqual(foo.x, 6)
4103        self.assertAlmostEqual(foo.y, 4.0)
4104        for i in range(10):
4105            self.assertAlmostEqual(arr[i], i*2)
4106        self.assertEqual(string.value, latin('hellohello'))
4107
4108    def test_synchronize(self):
4109        self.test_sharedctypes(lock=True)
4110
4111    def test_copy(self):
4112        foo = _Foo(2, 5.0, 2 ** 33)
4113        bar = copy(foo)
4114        foo.x = 0
4115        foo.y = 0
4116        foo.z = 0
4117        self.assertEqual(bar.x, 2)
4118        self.assertAlmostEqual(bar.y, 5.0)
4119        self.assertEqual(bar.z, 2 ** 33)
4120
4121
4122@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
4123@hashlib_helper.requires_hashdigest('sha256')
4124class _TestSharedMemory(BaseTestCase):
4125
4126    ALLOWED_TYPES = ('processes',)
4127
4128    @staticmethod
4129    def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
4130        if isinstance(shmem_name_or_obj, str):
4131            local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
4132        else:
4133            local_sms = shmem_name_or_obj
4134        local_sms.buf[:len(binary_data)] = binary_data
4135        local_sms.close()
4136
4137    def _new_shm_name(self, prefix):
4138        # Add a PID to the name of a POSIX shared memory object to allow
4139        # running multiprocessing tests (test_multiprocessing_fork,
4140        # test_multiprocessing_spawn, etc) in parallel.
4141        return prefix + str(os.getpid())
4142
4143    def test_shared_memory_name_with_embedded_null(self):
4144        name_tsmb = self._new_shm_name('test01_null')
4145        sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512)
4146        self.addCleanup(sms.unlink)
4147        with self.assertRaises(ValueError):
4148            shared_memory.SharedMemory(name_tsmb + '\0a', create=False, size=512)
4149        if shared_memory._USE_POSIX:
4150            orig_name = sms._name
4151            try:
4152                sms._name = orig_name + '\0a'
4153                with self.assertRaises(ValueError):
4154                    sms.unlink()
4155            finally:
4156                sms._name = orig_name
4157
4158    def test_shared_memory_basics(self):
4159        name_tsmb = self._new_shm_name('test01_tsmb')
4160        sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512)
4161        self.addCleanup(sms.unlink)
4162
4163        # Verify attributes are readable.
4164        self.assertEqual(sms.name, name_tsmb)
4165        self.assertGreaterEqual(sms.size, 512)
4166        self.assertGreaterEqual(len(sms.buf), sms.size)
4167
4168        # Verify __repr__
4169        self.assertIn(sms.name, str(sms))
4170        self.assertIn(str(sms.size), str(sms))
4171
4172        # Modify contents of shared memory segment through memoryview.
4173        sms.buf[0] = 42
4174        self.assertEqual(sms.buf[0], 42)
4175
4176        # Attach to existing shared memory segment.
4177        also_sms = shared_memory.SharedMemory(name_tsmb)
4178        self.assertEqual(also_sms.buf[0], 42)
4179        also_sms.close()
4180
4181        # Attach to existing shared memory segment but specify a new size.
4182        same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size)
4183        self.assertLess(same_sms.size, 20*sms.size)  # Size was ignored.
4184        same_sms.close()
4185
4186        # Creating Shared Memory Segment with -ve size
4187        with self.assertRaises(ValueError):
4188            shared_memory.SharedMemory(create=True, size=-2)
4189
4190        # Attaching Shared Memory Segment without a name
4191        with self.assertRaises(ValueError):
4192            shared_memory.SharedMemory(create=False)
4193
4194        # Test if shared memory segment is created properly,
4195        # when _make_filename returns an existing shared memory segment name
4196        with unittest.mock.patch(
4197            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
4198
4199            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
4200            names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')]
4201            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
4202            # because some POSIX compliant systems require name to start with /
4203            names = [NAME_PREFIX + name for name in names]
4204
4205            mock_make_filename.side_effect = names
4206            shm1 = shared_memory.SharedMemory(create=True, size=1)
4207            self.addCleanup(shm1.unlink)
4208            self.assertEqual(shm1._name, names[0])
4209
4210            mock_make_filename.side_effect = names
4211            shm2 = shared_memory.SharedMemory(create=True, size=1)
4212            self.addCleanup(shm2.unlink)
4213            self.assertEqual(shm2._name, names[1])
4214
4215        if shared_memory._USE_POSIX:
4216            # Posix Shared Memory can only be unlinked once.  Here we
4217            # test an implementation detail that is not observed across
4218            # all supported platforms (since WindowsNamedSharedMemory
4219            # manages unlinking on its own and unlink() does nothing).
4220            # True release of shared memory segment does not necessarily
4221            # happen until process exits, depending on the OS platform.
4222            name_dblunlink = self._new_shm_name('test01_dblunlink')
4223            sms_uno = shared_memory.SharedMemory(
4224                name_dblunlink,
4225                create=True,
4226                size=5000
4227            )
4228            with self.assertRaises(FileNotFoundError):
4229                try:
4230                    self.assertGreaterEqual(sms_uno.size, 5000)
4231
4232                    sms_duo = shared_memory.SharedMemory(name_dblunlink)
4233                    sms_duo.unlink()  # First shm_unlink() call.
4234                    sms_duo.close()
4235                    sms_uno.close()
4236
4237                finally:
4238                    sms_uno.unlink()  # A second shm_unlink() call is bad.
4239
4240        with self.assertRaises(FileExistsError):
4241            # Attempting to create a new shared memory segment with a
4242            # name that is already in use triggers an exception.
4243            there_can_only_be_one_sms = shared_memory.SharedMemory(
4244                name_tsmb,
4245                create=True,
4246                size=512
4247            )
4248
4249        if shared_memory._USE_POSIX:
4250            # Requesting creation of a shared memory segment with the option
4251            # to attach to an existing segment, if that name is currently in
4252            # use, should not trigger an exception.
4253            # Note:  Using a smaller size could possibly cause truncation of
4254            # the existing segment but is OS platform dependent.  In the
4255            # case of MacOS/darwin, requesting a smaller size is disallowed.
4256            class OptionalAttachSharedMemory(shared_memory.SharedMemory):
4257                _flags = os.O_CREAT | os.O_RDWR
4258            ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb)
4259            self.assertEqual(ok_if_exists_sms.size, sms.size)
4260            ok_if_exists_sms.close()
4261
4262        # Attempting to attach to an existing shared memory segment when
4263        # no segment exists with the supplied name triggers an exception.
4264        with self.assertRaises(FileNotFoundError):
4265            nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
4266            nonexisting_sms.unlink()  # Error should occur on prior line.
4267
4268        sms.close()
4269
4270    def test_shared_memory_recreate(self):
4271        # Test if shared memory segment is created properly,
4272        # when _make_filename returns an existing shared memory segment name
4273        with unittest.mock.patch(
4274            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
4275
4276            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
4277            names = [self._new_shm_name('test03_fn'), self._new_shm_name('test04_fn')]
4278            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
4279            # because some POSIX compliant systems require name to start with /
4280            names = [NAME_PREFIX + name for name in names]
4281
4282            mock_make_filename.side_effect = names
4283            shm1 = shared_memory.SharedMemory(create=True, size=1)
4284            self.addCleanup(shm1.unlink)
4285            self.assertEqual(shm1._name, names[0])
4286
4287            mock_make_filename.side_effect = names
4288            shm2 = shared_memory.SharedMemory(create=True, size=1)
4289            self.addCleanup(shm2.unlink)
4290            self.assertEqual(shm2._name, names[1])
4291
4292    def test_invalid_shared_memory_creation(self):
4293        # Test creating a shared memory segment with negative size
4294        with self.assertRaises(ValueError):
4295            sms_invalid = shared_memory.SharedMemory(create=True, size=-1)
4296
4297        # Test creating a shared memory segment with size 0
4298        with self.assertRaises(ValueError):
4299            sms_invalid = shared_memory.SharedMemory(create=True, size=0)
4300
4301        # Test creating a shared memory segment without size argument
4302        with self.assertRaises(ValueError):
4303            sms_invalid = shared_memory.SharedMemory(create=True)
4304
4305    def test_shared_memory_pickle_unpickle(self):
4306        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4307            with self.subTest(proto=proto):
4308                sms = shared_memory.SharedMemory(create=True, size=512)
4309                self.addCleanup(sms.unlink)
4310                sms.buf[0:6] = b'pickle'
4311
4312                # Test pickling
4313                pickled_sms = pickle.dumps(sms, protocol=proto)
4314
4315                # Test unpickling
4316                sms2 = pickle.loads(pickled_sms)
4317                self.assertIsInstance(sms2, shared_memory.SharedMemory)
4318                self.assertEqual(sms.name, sms2.name)
4319                self.assertEqual(bytes(sms.buf[0:6]), b'pickle')
4320                self.assertEqual(bytes(sms2.buf[0:6]), b'pickle')
4321
4322                # Test that unpickled version is still the same SharedMemory
4323                sms.buf[0:6] = b'newval'
4324                self.assertEqual(bytes(sms.buf[0:6]), b'newval')
4325                self.assertEqual(bytes(sms2.buf[0:6]), b'newval')
4326
4327                sms2.buf[0:6] = b'oldval'
4328                self.assertEqual(bytes(sms.buf[0:6]), b'oldval')
4329                self.assertEqual(bytes(sms2.buf[0:6]), b'oldval')
4330
4331    def test_shared_memory_pickle_unpickle_dead_object(self):
4332        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4333            with self.subTest(proto=proto):
4334                sms = shared_memory.SharedMemory(create=True, size=512)
4335                sms.buf[0:6] = b'pickle'
4336                pickled_sms = pickle.dumps(sms, protocol=proto)
4337
4338                # Now, we are going to kill the original object.
4339                # So, unpickled one won't be able to attach to it.
4340                sms.close()
4341                sms.unlink()
4342
4343                with self.assertRaises(FileNotFoundError):
4344                    pickle.loads(pickled_sms)
4345
4346    def test_shared_memory_across_processes(self):
4347        # bpo-40135: don't define shared memory block's name in case of
4348        # the failure when we run multiprocessing tests in parallel.
4349        sms = shared_memory.SharedMemory(create=True, size=512)
4350        self.addCleanup(sms.unlink)
4351
4352        # Verify remote attachment to existing block by name is working.
4353        p = self.Process(
4354            target=self._attach_existing_shmem_then_write,
4355            args=(sms.name, b'howdy')
4356        )
4357        p.daemon = True
4358        p.start()
4359        p.join()
4360        self.assertEqual(bytes(sms.buf[:5]), b'howdy')
4361
4362        # Verify pickling of SharedMemory instance also works.
4363        p = self.Process(
4364            target=self._attach_existing_shmem_then_write,
4365            args=(sms, b'HELLO')
4366        )
4367        p.daemon = True
4368        p.start()
4369        p.join()
4370        self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
4371
4372        sms.close()
4373
4374    @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
4375    def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
4376        # bpo-36368: protect SharedMemoryManager server process from
4377        # KeyboardInterrupt signals.
4378        smm = multiprocessing.managers.SharedMemoryManager()
4379        smm.start()
4380
4381        # make sure the manager works properly at the beginning
4382        sl = smm.ShareableList(range(10))
4383
4384        # the manager's server should ignore KeyboardInterrupt signals, and
4385        # maintain its connection with the current process, and success when
4386        # asked to deliver memory segments.
4387        os.kill(smm._process.pid, signal.SIGINT)
4388
4389        sl2 = smm.ShareableList(range(10))
4390
4391        # test that the custom signal handler registered in the Manager does
4392        # not affect signal handling in the parent process.
4393        with self.assertRaises(KeyboardInterrupt):
4394            os.kill(os.getpid(), signal.SIGINT)
4395
4396        smm.shutdown()
4397
4398    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4399    def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
4400        # bpo-36867: test that a SharedMemoryManager uses the
4401        # same resource_tracker process as its parent.
4402        cmd = '''if 1:
4403            from multiprocessing.managers import SharedMemoryManager
4404
4405
4406            smm = SharedMemoryManager()
4407            smm.start()
4408            sl = smm.ShareableList(range(10))
4409            smm.shutdown()
4410        '''
4411        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
4412
4413        # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
4414        # resource_tracker process as its parent would make the parent's
4415        # tracker complain about sl being leaked even though smm.shutdown()
4416        # properly released sl.
4417        self.assertFalse(err)
4418
4419    def test_shared_memory_SharedMemoryManager_basics(self):
4420        smm1 = multiprocessing.managers.SharedMemoryManager()
4421        with self.assertRaises(ValueError):
4422            smm1.SharedMemory(size=9)  # Fails if SharedMemoryServer not started
4423        smm1.start()
4424        lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
4425        lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
4426        doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
4427        self.assertEqual(len(doppleganger_list0), 5)
4428        doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
4429        self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
4430        held_name = lom[0].name
4431        smm1.shutdown()
4432        if sys.platform != "win32":
4433            # Calls to unlink() have no effect on Windows platform; shared
4434            # memory will only be released once final process exits.
4435            with self.assertRaises(FileNotFoundError):
4436                # No longer there to be attached to again.
4437                absent_shm = shared_memory.SharedMemory(name=held_name)
4438
4439        with multiprocessing.managers.SharedMemoryManager() as smm2:
4440            sl = smm2.ShareableList("howdy")
4441            shm = smm2.SharedMemory(size=128)
4442            held_name = sl.shm.name
4443        if sys.platform != "win32":
4444            with self.assertRaises(FileNotFoundError):
4445                # No longer there to be attached to again.
4446                absent_sl = shared_memory.ShareableList(name=held_name)
4447
4448
4449    def test_shared_memory_ShareableList_basics(self):
4450        sl = shared_memory.ShareableList(
4451            ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
4452        )
4453        self.addCleanup(sl.shm.unlink)
4454
4455        # Verify __repr__
4456        self.assertIn(sl.shm.name, str(sl))
4457        self.assertIn(str(list(sl)), str(sl))
4458
4459        # Index Out of Range (get)
4460        with self.assertRaises(IndexError):
4461            sl[7]
4462
4463        # Index Out of Range (set)
4464        with self.assertRaises(IndexError):
4465            sl[7] = 2
4466
4467        # Assign value without format change (str -> str)
4468        current_format = sl._get_packing_format(0)
4469        sl[0] = 'howdy'
4470        self.assertEqual(current_format, sl._get_packing_format(0))
4471
4472        # Verify attributes are readable.
4473        self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
4474
4475        # Exercise len().
4476        self.assertEqual(len(sl), 7)
4477
4478        # Exercise index().
4479        with warnings.catch_warnings():
4480            # Suppress BytesWarning when comparing against b'HoWdY'.
4481            warnings.simplefilter('ignore')
4482            with self.assertRaises(ValueError):
4483                sl.index('100')
4484            self.assertEqual(sl.index(100), 3)
4485
4486        # Exercise retrieving individual values.
4487        self.assertEqual(sl[0], 'howdy')
4488        self.assertEqual(sl[-2], True)
4489
4490        # Exercise iterability.
4491        self.assertEqual(
4492            tuple(sl),
4493            ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
4494        )
4495
4496        # Exercise modifying individual values.
4497        sl[3] = 42
4498        self.assertEqual(sl[3], 42)
4499        sl[4] = 'some'  # Change type at a given position.
4500        self.assertEqual(sl[4], 'some')
4501        self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
4502        with self.assertRaisesRegex(ValueError,
4503                                    "exceeds available storage"):
4504            sl[4] = 'far too many'
4505        self.assertEqual(sl[4], 'some')
4506        sl[0] = 'encodés'  # Exactly 8 bytes of UTF-8 data
4507        self.assertEqual(sl[0], 'encodés')
4508        self.assertEqual(sl[1], b'HoWdY')  # no spillage
4509        with self.assertRaisesRegex(ValueError,
4510                                    "exceeds available storage"):
4511            sl[0] = 'encodées'  # Exactly 9 bytes of UTF-8 data
4512        self.assertEqual(sl[1], b'HoWdY')
4513        with self.assertRaisesRegex(ValueError,
4514                                    "exceeds available storage"):
4515            sl[1] = b'123456789'
4516        self.assertEqual(sl[1], b'HoWdY')
4517
4518        # Exercise count().
4519        with warnings.catch_warnings():
4520            # Suppress BytesWarning when comparing against b'HoWdY'.
4521            warnings.simplefilter('ignore')
4522            self.assertEqual(sl.count(42), 2)
4523            self.assertEqual(sl.count(b'HoWdY'), 1)
4524            self.assertEqual(sl.count(b'adios'), 0)
4525
4526        # Exercise creating a duplicate.
4527        name_duplicate = self._new_shm_name('test03_duplicate')
4528        sl_copy = shared_memory.ShareableList(sl, name=name_duplicate)
4529        try:
4530            self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
4531            self.assertEqual(name_duplicate, sl_copy.shm.name)
4532            self.assertEqual(list(sl), list(sl_copy))
4533            self.assertEqual(sl.format, sl_copy.format)
4534            sl_copy[-1] = 77
4535            self.assertEqual(sl_copy[-1], 77)
4536            self.assertNotEqual(sl[-1], 77)
4537            sl_copy.shm.close()
4538        finally:
4539            sl_copy.shm.unlink()
4540
4541        # Obtain a second handle on the same ShareableList.
4542        sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
4543        self.assertEqual(sl.shm.name, sl_tethered.shm.name)
4544        sl_tethered[-1] = 880
4545        self.assertEqual(sl[-1], 880)
4546        sl_tethered.shm.close()
4547
4548        sl.shm.close()
4549
4550        # Exercise creating an empty ShareableList.
4551        empty_sl = shared_memory.ShareableList()
4552        try:
4553            self.assertEqual(len(empty_sl), 0)
4554            self.assertEqual(empty_sl.format, '')
4555            self.assertEqual(empty_sl.count('any'), 0)
4556            with self.assertRaises(ValueError):
4557                empty_sl.index(None)
4558            empty_sl.shm.close()
4559        finally:
4560            empty_sl.shm.unlink()
4561
4562    def test_shared_memory_ShareableList_pickling(self):
4563        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4564            with self.subTest(proto=proto):
4565                sl = shared_memory.ShareableList(range(10))
4566                self.addCleanup(sl.shm.unlink)
4567
4568                serialized_sl = pickle.dumps(sl, protocol=proto)
4569                deserialized_sl = pickle.loads(serialized_sl)
4570                self.assertIsInstance(
4571                    deserialized_sl, shared_memory.ShareableList)
4572                self.assertEqual(deserialized_sl[-1], 9)
4573                self.assertIsNot(sl, deserialized_sl)
4574
4575                deserialized_sl[4] = "changed"
4576                self.assertEqual(sl[4], "changed")
4577                sl[3] = "newvalue"
4578                self.assertEqual(deserialized_sl[3], "newvalue")
4579
4580                larger_sl = shared_memory.ShareableList(range(400))
4581                self.addCleanup(larger_sl.shm.unlink)
4582                serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto)
4583                self.assertEqual(len(serialized_sl), len(serialized_larger_sl))
4584                larger_sl.shm.close()
4585
4586                deserialized_sl.shm.close()
4587                sl.shm.close()
4588
4589    def test_shared_memory_ShareableList_pickling_dead_object(self):
4590        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4591            with self.subTest(proto=proto):
4592                sl = shared_memory.ShareableList(range(10))
4593                serialized_sl = pickle.dumps(sl, protocol=proto)
4594
4595                # Now, we are going to kill the original object.
4596                # So, unpickled one won't be able to attach to it.
4597                sl.shm.close()
4598                sl.shm.unlink()
4599
4600                with self.assertRaises(FileNotFoundError):
4601                    pickle.loads(serialized_sl)
4602
4603    def test_shared_memory_cleaned_after_process_termination(self):
4604        cmd = '''if 1:
4605            import os, time, sys
4606            from multiprocessing import shared_memory
4607
4608            # Create a shared_memory segment, and send the segment name
4609            sm = shared_memory.SharedMemory(create=True, size=10)
4610            sys.stdout.write(sm.name + '\\n')
4611            sys.stdout.flush()
4612            time.sleep(100)
4613        '''
4614        with subprocess.Popen([sys.executable, '-E', '-c', cmd],
4615                              stdout=subprocess.PIPE,
4616                              stderr=subprocess.PIPE) as p:
4617            name = p.stdout.readline().strip().decode()
4618
4619            # killing abruptly processes holding reference to a shared memory
4620            # segment should not leak the given memory segment.
4621            p.terminate()
4622            p.wait()
4623
4624            err_msg = ("A SharedMemory segment was leaked after "
4625                       "a process was abruptly terminated")
4626            for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
4627                try:
4628                    smm = shared_memory.SharedMemory(name, create=False)
4629                except FileNotFoundError:
4630                    break
4631
4632            if os.name == 'posix':
4633                # Without this line it was raising warnings like:
4634                #   UserWarning: resource_tracker:
4635                #   There appear to be 1 leaked shared_memory
4636                #   objects to clean up at shutdown
4637                # See: https://bugs.python.org/issue45209
4638                resource_tracker.unregister(f"/{name}", "shared_memory")
4639
4640                # A warning was emitted by the subprocess' own
4641                # resource_tracker (on Windows, shared memory segments
4642                # are released automatically by the OS).
4643                err = p.stderr.read().decode()
4644                self.assertIn(
4645                    "resource_tracker: There appear to be 1 leaked "
4646                    "shared_memory objects to clean up at shutdown", err)
4647
4648    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4649    def test_shared_memory_untracking(self):
4650        # gh-82300: When a separate Python process accesses shared memory
4651        # with track=False, it must not cause the memory to be deleted
4652        # when terminating.
4653        cmd = '''if 1:
4654            import sys
4655            from multiprocessing.shared_memory import SharedMemory
4656            mem = SharedMemory(create=False, name=sys.argv[1], track=False)
4657            mem.close()
4658        '''
4659        mem = shared_memory.SharedMemory(create=True, size=10)
4660        # The resource tracker shares pipes with the subprocess, and so
4661        # err existing means that the tracker process has terminated now.
4662        try:
4663            rc, out, err = script_helper.assert_python_ok("-c", cmd, mem.name)
4664            self.assertNotIn(b"resource_tracker", err)
4665            self.assertEqual(rc, 0)
4666            mem2 = shared_memory.SharedMemory(create=False, name=mem.name)
4667            mem2.close()
4668        finally:
4669            try:
4670                mem.unlink()
4671            except OSError:
4672                pass
4673            mem.close()
4674
4675    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4676    def test_shared_memory_tracking(self):
4677        # gh-82300: When a separate Python process accesses shared memory
4678        # with track=True, it must cause the memory to be deleted when
4679        # terminating.
4680        cmd = '''if 1:
4681            import sys
4682            from multiprocessing.shared_memory import SharedMemory
4683            mem = SharedMemory(create=False, name=sys.argv[1], track=True)
4684            mem.close()
4685        '''
4686        mem = shared_memory.SharedMemory(create=True, size=10)
4687        try:
4688            rc, out, err = script_helper.assert_python_ok("-c", cmd, mem.name)
4689            self.assertEqual(rc, 0)
4690            self.assertIn(
4691                b"resource_tracker: There appear to be 1 leaked "
4692                b"shared_memory objects to clean up at shutdown", err)
4693        finally:
4694            try:
4695                mem.unlink()
4696            except OSError:
4697                pass
4698            resource_tracker.unregister(mem._name, "shared_memory")
4699            mem.close()
4700
4701#
4702# Test to verify that `Finalize` works.
4703#
4704
4705class _TestFinalize(BaseTestCase):
4706
4707    ALLOWED_TYPES = ('processes',)
4708
4709    def setUp(self):
4710        self.registry_backup = util._finalizer_registry.copy()
4711        util._finalizer_registry.clear()
4712
4713    def tearDown(self):
4714        gc.collect()  # For PyPy or other GCs.
4715        self.assertFalse(util._finalizer_registry)
4716        util._finalizer_registry.update(self.registry_backup)
4717
4718    @classmethod
4719    def _test_finalize(cls, conn):
4720        class Foo(object):
4721            pass
4722
4723        a = Foo()
4724        util.Finalize(a, conn.send, args=('a',))
4725        del a           # triggers callback for a
4726        gc.collect()  # For PyPy or other GCs.
4727
4728        b = Foo()
4729        close_b = util.Finalize(b, conn.send, args=('b',))
4730        close_b()       # triggers callback for b
4731        close_b()       # does nothing because callback has already been called
4732        del b           # does nothing because callback has already been called
4733        gc.collect()  # For PyPy or other GCs.
4734
4735        c = Foo()
4736        util.Finalize(c, conn.send, args=('c',))
4737
4738        d10 = Foo()
4739        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4740
4741        d01 = Foo()
4742        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4743        d02 = Foo()
4744        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4745        d03 = Foo()
4746        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4747
4748        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4749
4750        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4751
4752        # call multiprocessing's cleanup function then exit process without
4753        # garbage collecting locals
4754        util._exit_function()
4755        conn.close()
4756        os._exit(0)
4757
4758    def test_finalize(self):
4759        conn, child_conn = self.Pipe()
4760
4761        p = self.Process(target=self._test_finalize, args=(child_conn,))
4762        p.daemon = True
4763        p.start()
4764        p.join()
4765
4766        result = [obj for obj in iter(conn.recv, 'STOP')]
4767        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4768
4769    @support.requires_resource('cpu')
4770    def test_thread_safety(self):
4771        # bpo-24484: _run_finalizers() should be thread-safe
4772        def cb():
4773            pass
4774
4775        class Foo(object):
4776            def __init__(self):
4777                self.ref = self  # create reference cycle
4778                # insert finalizer at random key
4779                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4780
4781        finish = False
4782        exc = None
4783
4784        def run_finalizers():
4785            nonlocal exc
4786            while not finish:
4787                time.sleep(random.random() * 1e-1)
4788                try:
4789                    # A GC run will eventually happen during this,
4790                    # collecting stale Foo's and mutating the registry
4791                    util._run_finalizers()
4792                except Exception as e:
4793                    exc = e
4794
4795        def make_finalizers():
4796            nonlocal exc
4797            d = {}
4798            while not finish:
4799                try:
4800                    # Old Foo's get gradually replaced and later
4801                    # collected by the GC (because of the cyclic ref)
4802                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
4803                except Exception as e:
4804                    exc = e
4805                    d.clear()
4806
4807        old_interval = sys.getswitchinterval()
4808        old_threshold = gc.get_threshold()
4809        try:
4810            support.setswitchinterval(1e-6)
4811            gc.set_threshold(5, 5, 5)
4812            threads = [threading.Thread(target=run_finalizers),
4813                       threading.Thread(target=make_finalizers)]
4814            with threading_helper.start_threads(threads):
4815                time.sleep(4.0)  # Wait a bit to trigger race condition
4816                finish = True
4817            if exc is not None:
4818                raise exc
4819        finally:
4820            sys.setswitchinterval(old_interval)
4821            gc.set_threshold(*old_threshold)
4822            gc.collect()  # Collect remaining Foo's
4823
4824
4825#
4826# Test that from ... import * works for each module
4827#
4828
4829class _TestImportStar(unittest.TestCase):
4830
4831    def get_module_names(self):
4832        import glob
4833        folder = os.path.dirname(multiprocessing.__file__)
4834        pattern = os.path.join(glob.escape(folder), '*.py')
4835        files = glob.glob(pattern)
4836        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4837        modules = ['multiprocessing.' + m for m in modules]
4838        modules.remove('multiprocessing.__init__')
4839        modules.append('multiprocessing')
4840        return modules
4841
4842    def test_import(self):
4843        modules = self.get_module_names()
4844        if sys.platform == 'win32':
4845            modules.remove('multiprocessing.popen_fork')
4846            modules.remove('multiprocessing.popen_forkserver')
4847            modules.remove('multiprocessing.popen_spawn_posix')
4848        else:
4849            modules.remove('multiprocessing.popen_spawn_win32')
4850            if not HAS_REDUCTION:
4851                modules.remove('multiprocessing.popen_forkserver')
4852
4853        if c_int is None:
4854            # This module requires _ctypes
4855            modules.remove('multiprocessing.sharedctypes')
4856
4857        for name in modules:
4858            __import__(name)
4859            mod = sys.modules[name]
4860            self.assertTrue(hasattr(mod, '__all__'), name)
4861
4862            for attr in mod.__all__:
4863                self.assertTrue(
4864                    hasattr(mod, attr),
4865                    '%r does not have attribute %r' % (mod, attr)
4866                    )
4867
4868#
4869# Quick test that logging works -- does not test logging output
4870#
4871
4872class _TestLogging(BaseTestCase):
4873
4874    ALLOWED_TYPES = ('processes',)
4875
4876    def test_enable_logging(self):
4877        logger = multiprocessing.get_logger()
4878        logger.setLevel(util.SUBWARNING)
4879        self.assertTrue(logger is not None)
4880        logger.debug('this will not be printed')
4881        logger.info('nor will this')
4882        logger.setLevel(LOG_LEVEL)
4883
4884    @classmethod
4885    def _test_level(cls, conn):
4886        logger = multiprocessing.get_logger()
4887        conn.send(logger.getEffectiveLevel())
4888
4889    def test_level(self):
4890        LEVEL1 = 32
4891        LEVEL2 = 37
4892
4893        logger = multiprocessing.get_logger()
4894        root_logger = logging.getLogger()
4895        root_level = root_logger.level
4896
4897        reader, writer = multiprocessing.Pipe(duplex=False)
4898
4899        logger.setLevel(LEVEL1)
4900        p = self.Process(target=self._test_level, args=(writer,))
4901        p.start()
4902        self.assertEqual(LEVEL1, reader.recv())
4903        p.join()
4904        p.close()
4905
4906        logger.setLevel(logging.NOTSET)
4907        root_logger.setLevel(LEVEL2)
4908        p = self.Process(target=self._test_level, args=(writer,))
4909        p.start()
4910        self.assertEqual(LEVEL2, reader.recv())
4911        p.join()
4912        p.close()
4913
4914        root_logger.setLevel(root_level)
4915        logger.setLevel(level=LOG_LEVEL)
4916
4917    def test_filename(self):
4918        logger = multiprocessing.get_logger()
4919        original_level = logger.level
4920        try:
4921            logger.setLevel(util.DEBUG)
4922            stream = io.StringIO()
4923            handler = logging.StreamHandler(stream)
4924            logging_format = '[%(levelname)s] [%(filename)s] %(message)s'
4925            handler.setFormatter(logging.Formatter(logging_format))
4926            logger.addHandler(handler)
4927            logger.info('1')
4928            util.info('2')
4929            logger.debug('3')
4930            filename = os.path.basename(__file__)
4931            log_record = stream.getvalue()
4932            self.assertIn(f'[INFO] [{filename}] 1', log_record)
4933            self.assertIn(f'[INFO] [{filename}] 2', log_record)
4934            self.assertIn(f'[DEBUG] [{filename}] 3', log_record)
4935        finally:
4936            logger.setLevel(original_level)
4937            logger.removeHandler(handler)
4938            handler.close()
4939
4940
4941# class _TestLoggingProcessName(BaseTestCase):
4942#
4943#     def handle(self, record):
4944#         assert record.processName == multiprocessing.current_process().name
4945#         self.__handled = True
4946#
4947#     def test_logging(self):
4948#         handler = logging.Handler()
4949#         handler.handle = self.handle
4950#         self.__handled = False
4951#         # Bypass getLogger() and side-effects
4952#         logger = logging.getLoggerClass()(
4953#                 'multiprocessing.test.TestLoggingProcessName')
4954#         logger.addHandler(handler)
4955#         logger.propagate = False
4956#
4957#         logger.warn('foo')
4958#         assert self.__handled
4959
4960#
4961# Check that Process.join() retries if os.waitpid() fails with EINTR
4962#
4963
4964class _TestPollEintr(BaseTestCase):
4965
4966    ALLOWED_TYPES = ('processes',)
4967
4968    @classmethod
4969    def _killer(cls, pid):
4970        time.sleep(0.1)
4971        os.kill(pid, signal.SIGUSR1)
4972
4973    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4974    def test_poll_eintr(self):
4975        got_signal = [False]
4976        def record(*args):
4977            got_signal[0] = True
4978        pid = os.getpid()
4979        oldhandler = signal.signal(signal.SIGUSR1, record)
4980        try:
4981            killer = self.Process(target=self._killer, args=(pid,))
4982            killer.start()
4983            try:
4984                p = self.Process(target=time.sleep, args=(2,))
4985                p.start()
4986                p.join()
4987            finally:
4988                killer.join()
4989            self.assertTrue(got_signal[0])
4990            self.assertEqual(p.exitcode, 0)
4991        finally:
4992            signal.signal(signal.SIGUSR1, oldhandler)
4993
4994#
4995# Test to verify handle verification, see issue 3321
4996#
4997
4998class TestInvalidHandle(unittest.TestCase):
4999
5000    @unittest.skipIf(WIN32, "skipped on Windows")
5001    def test_invalid_handles(self):
5002        conn = multiprocessing.connection.Connection(44977608)
5003        # check that poll() doesn't crash
5004        try:
5005            conn.poll()
5006        except (ValueError, OSError):
5007            pass
5008        finally:
5009            # Hack private attribute _handle to avoid printing an error
5010            # in conn.__del__
5011            conn._handle = None
5012        self.assertRaises((ValueError, OSError),
5013                          multiprocessing.connection.Connection, -1)
5014
5015
5016
5017@hashlib_helper.requires_hashdigest('sha256')
5018class OtherTest(unittest.TestCase):
5019    # TODO: add more tests for deliver/answer challenge.
5020    def test_deliver_challenge_auth_failure(self):
5021        class _FakeConnection(object):
5022            def recv_bytes(self, size):
5023                return b'something bogus'
5024            def send_bytes(self, data):
5025                pass
5026        self.assertRaises(multiprocessing.AuthenticationError,
5027                          multiprocessing.connection.deliver_challenge,
5028                          _FakeConnection(), b'abc')
5029
5030    def test_answer_challenge_auth_failure(self):
5031        class _FakeConnection(object):
5032            def __init__(self):
5033                self.count = 0
5034            def recv_bytes(self, size):
5035                self.count += 1
5036                if self.count == 1:
5037                    return multiprocessing.connection._CHALLENGE
5038                elif self.count == 2:
5039                    return b'something bogus'
5040                return b''
5041            def send_bytes(self, data):
5042                pass
5043        self.assertRaises(multiprocessing.AuthenticationError,
5044                          multiprocessing.connection.answer_challenge,
5045                          _FakeConnection(), b'abc')
5046
5047
5048@hashlib_helper.requires_hashdigest('md5')
5049@hashlib_helper.requires_hashdigest('sha256')
5050class ChallengeResponseTest(unittest.TestCase):
5051    authkey = b'supadupasecretkey'
5052
5053    def create_response(self, message):
5054        return multiprocessing.connection._create_response(
5055            self.authkey, message
5056        )
5057
5058    def verify_challenge(self, message, response):
5059        return multiprocessing.connection._verify_challenge(
5060            self.authkey, message, response
5061        )
5062
5063    def test_challengeresponse(self):
5064        for algo in [None, "md5", "sha256"]:
5065            with self.subTest(f"{algo=}"):
5066                msg = b'is-twenty-bytes-long'  # The length of a legacy message.
5067                if algo:
5068                    prefix = b'{%s}' % algo.encode("ascii")
5069                else:
5070                    prefix = b''
5071                msg = prefix + msg
5072                response = self.create_response(msg)
5073                if not response.startswith(prefix):
5074                    self.fail(response)
5075                self.verify_challenge(msg, response)
5076
5077    # TODO(gpshead): We need integration tests for handshakes between modern
5078    # deliver_challenge() and verify_response() code and connections running a
5079    # test-local copy of the legacy Python <=3.11 implementations.
5080
5081    # TODO(gpshead): properly annotate tests for requires_hashdigest rather than
5082    # only running these on a platform supporting everything.  otherwise logic
5083    # issues preventing it from working on FIPS mode setups will be hidden.
5084
5085#
5086# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
5087#
5088
5089def initializer(ns):
5090    ns.test += 1
5091
5092@hashlib_helper.requires_hashdigest('sha256')
5093class TestInitializers(unittest.TestCase):
5094    def setUp(self):
5095        self.mgr = multiprocessing.Manager()
5096        self.ns = self.mgr.Namespace()
5097        self.ns.test = 0
5098
5099    def tearDown(self):
5100        self.mgr.shutdown()
5101        self.mgr.join()
5102
5103    def test_manager_initializer(self):
5104        m = multiprocessing.managers.SyncManager()
5105        self.assertRaises(TypeError, m.start, 1)
5106        m.start(initializer, (self.ns,))
5107        self.assertEqual(self.ns.test, 1)
5108        m.shutdown()
5109        m.join()
5110
5111    def test_pool_initializer(self):
5112        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
5113        p = multiprocessing.Pool(1, initializer, (self.ns,))
5114        p.close()
5115        p.join()
5116        self.assertEqual(self.ns.test, 1)
5117
5118#
5119# Issue 5155, 5313, 5331: Test process in processes
5120# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
5121#
5122
5123def _this_sub_process(q):
5124    try:
5125        item = q.get(block=False)
5126    except pyqueue.Empty:
5127        pass
5128
5129def _test_process():
5130    queue = multiprocessing.Queue()
5131    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
5132    subProc.daemon = True
5133    subProc.start()
5134    subProc.join()
5135
5136def _afunc(x):
5137    return x*x
5138
5139def pool_in_process():
5140    pool = multiprocessing.Pool(processes=4)
5141    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
5142    pool.close()
5143    pool.join()
5144
5145class _file_like(object):
5146    def __init__(self, delegate):
5147        self._delegate = delegate
5148        self._pid = None
5149
5150    @property
5151    def cache(self):
5152        pid = os.getpid()
5153        # There are no race conditions since fork keeps only the running thread
5154        if pid != self._pid:
5155            self._pid = pid
5156            self._cache = []
5157        return self._cache
5158
5159    def write(self, data):
5160        self.cache.append(data)
5161
5162    def flush(self):
5163        self._delegate.write(''.join(self.cache))
5164        self._cache = []
5165
5166class TestStdinBadfiledescriptor(unittest.TestCase):
5167
5168    def test_queue_in_process(self):
5169        proc = multiprocessing.Process(target=_test_process)
5170        proc.start()
5171        proc.join()
5172
5173    def test_pool_in_process(self):
5174        p = multiprocessing.Process(target=pool_in_process)
5175        p.start()
5176        p.join()
5177
5178    def test_flushing(self):
5179        sio = io.StringIO()
5180        flike = _file_like(sio)
5181        flike.write('foo')
5182        proc = multiprocessing.Process(target=lambda: flike.flush())
5183        flike.flush()
5184        assert sio.getvalue() == 'foo'
5185
5186
5187class TestWait(unittest.TestCase):
5188
5189    @classmethod
5190    def _child_test_wait(cls, w, slow):
5191        for i in range(10):
5192            if slow:
5193                time.sleep(random.random() * 0.100)
5194            w.send((i, os.getpid()))
5195        w.close()
5196
5197    def test_wait(self, slow=False):
5198        from multiprocessing.connection import wait
5199        readers = []
5200        procs = []
5201        messages = []
5202
5203        for i in range(4):
5204            r, w = multiprocessing.Pipe(duplex=False)
5205            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
5206            p.daemon = True
5207            p.start()
5208            w.close()
5209            readers.append(r)
5210            procs.append(p)
5211            self.addCleanup(p.join)
5212
5213        while readers:
5214            for r in wait(readers):
5215                try:
5216                    msg = r.recv()
5217                except EOFError:
5218                    readers.remove(r)
5219                    r.close()
5220                else:
5221                    messages.append(msg)
5222
5223        messages.sort()
5224        expected = sorted((i, p.pid) for i in range(10) for p in procs)
5225        self.assertEqual(messages, expected)
5226
5227    @classmethod
5228    def _child_test_wait_socket(cls, address, slow):
5229        s = socket.socket()
5230        s.connect(address)
5231        for i in range(10):
5232            if slow:
5233                time.sleep(random.random() * 0.100)
5234            s.sendall(('%s\n' % i).encode('ascii'))
5235        s.close()
5236
5237    def test_wait_socket(self, slow=False):
5238        from multiprocessing.connection import wait
5239        l = socket.create_server((socket_helper.HOST, 0))
5240        addr = l.getsockname()
5241        readers = []
5242        procs = []
5243        dic = {}
5244
5245        for i in range(4):
5246            p = multiprocessing.Process(target=self._child_test_wait_socket,
5247                                        args=(addr, slow))
5248            p.daemon = True
5249            p.start()
5250            procs.append(p)
5251            self.addCleanup(p.join)
5252
5253        for i in range(4):
5254            r, _ = l.accept()
5255            readers.append(r)
5256            dic[r] = []
5257        l.close()
5258
5259        while readers:
5260            for r in wait(readers):
5261                msg = r.recv(32)
5262                if not msg:
5263                    readers.remove(r)
5264                    r.close()
5265                else:
5266                    dic[r].append(msg)
5267
5268        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
5269        for v in dic.values():
5270            self.assertEqual(b''.join(v), expected)
5271
5272    def test_wait_slow(self):
5273        self.test_wait(True)
5274
5275    def test_wait_socket_slow(self):
5276        self.test_wait_socket(True)
5277
5278    @support.requires_resource('walltime')
5279    def test_wait_timeout(self):
5280        from multiprocessing.connection import wait
5281
5282        timeout = 5.0  # seconds
5283        a, b = multiprocessing.Pipe()
5284
5285        start = time.monotonic()
5286        res = wait([a, b], timeout)
5287        delta = time.monotonic() - start
5288
5289        self.assertEqual(res, [])
5290        self.assertGreater(delta, timeout - CLOCK_RES)
5291
5292        b.send(None)
5293        res = wait([a, b], 20)
5294        self.assertEqual(res, [a])
5295
5296    @classmethod
5297    def signal_and_sleep(cls, sem, period):
5298        sem.release()
5299        time.sleep(period)
5300
5301    @support.requires_resource('walltime')
5302    def test_wait_integer(self):
5303        from multiprocessing.connection import wait
5304
5305        expected = 3
5306        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
5307        sem = multiprocessing.Semaphore(0)
5308        a, b = multiprocessing.Pipe()
5309        p = multiprocessing.Process(target=self.signal_and_sleep,
5310                                    args=(sem, expected))
5311
5312        p.start()
5313        self.assertIsInstance(p.sentinel, int)
5314        self.assertTrue(sem.acquire(timeout=20))
5315
5316        start = time.monotonic()
5317        res = wait([a, p.sentinel, b], expected + 20)
5318        delta = time.monotonic() - start
5319
5320        self.assertEqual(res, [p.sentinel])
5321        self.assertLess(delta, expected + 2)
5322        self.assertGreater(delta, expected - 2)
5323
5324        a.send(None)
5325
5326        start = time.monotonic()
5327        res = wait([a, p.sentinel, b], 20)
5328        delta = time.monotonic() - start
5329
5330        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
5331        self.assertLess(delta, 0.4)
5332
5333        b.send(None)
5334
5335        start = time.monotonic()
5336        res = wait([a, p.sentinel, b], 20)
5337        delta = time.monotonic() - start
5338
5339        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
5340        self.assertLess(delta, 0.4)
5341
5342        p.terminate()
5343        p.join()
5344
5345    def test_neg_timeout(self):
5346        from multiprocessing.connection import wait
5347        a, b = multiprocessing.Pipe()
5348        t = time.monotonic()
5349        res = wait([a], timeout=-1)
5350        t = time.monotonic() - t
5351        self.assertEqual(res, [])
5352        self.assertLess(t, 1)
5353        a.close()
5354        b.close()
5355
5356#
5357# Issue 14151: Test invalid family on invalid environment
5358#
5359
5360class TestInvalidFamily(unittest.TestCase):
5361
5362    @unittest.skipIf(WIN32, "skipped on Windows")
5363    def test_invalid_family(self):
5364        with self.assertRaises(ValueError):
5365            multiprocessing.connection.Listener(r'\\.\test')
5366
5367    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
5368    def test_invalid_family_win32(self):
5369        with self.assertRaises(ValueError):
5370            multiprocessing.connection.Listener('/var/test.pipe')
5371
5372#
5373# Issue 12098: check sys.flags of child matches that for parent
5374#
5375
5376class TestFlags(unittest.TestCase):
5377    @classmethod
5378    def run_in_grandchild(cls, conn):
5379        conn.send(tuple(sys.flags))
5380
5381    @classmethod
5382    def run_in_child(cls, start_method):
5383        import json
5384        mp = multiprocessing.get_context(start_method)
5385        r, w = mp.Pipe(duplex=False)
5386        p = mp.Process(target=cls.run_in_grandchild, args=(w,))
5387        with warnings.catch_warnings(category=DeprecationWarning):
5388            p.start()
5389        grandchild_flags = r.recv()
5390        p.join()
5391        r.close()
5392        w.close()
5393        flags = (tuple(sys.flags), grandchild_flags)
5394        print(json.dumps(flags))
5395
5396    def test_flags(self):
5397        import json
5398        # start child process using unusual flags
5399        prog = (
5400            'from test._test_multiprocessing import TestFlags; '
5401            f'TestFlags.run_in_child({multiprocessing.get_start_method()!r})'
5402        )
5403        data = subprocess.check_output(
5404            [sys.executable, '-E', '-S', '-O', '-c', prog])
5405        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
5406        self.assertEqual(child_flags, grandchild_flags)
5407
5408#
5409# Test interaction with socket timeouts - see Issue #6056
5410#
5411
5412class TestTimeouts(unittest.TestCase):
5413    @classmethod
5414    def _test_timeout(cls, child, address):
5415        time.sleep(1)
5416        child.send(123)
5417        child.close()
5418        conn = multiprocessing.connection.Client(address)
5419        conn.send(456)
5420        conn.close()
5421
5422    def test_timeout(self):
5423        old_timeout = socket.getdefaulttimeout()
5424        try:
5425            socket.setdefaulttimeout(0.1)
5426            parent, child = multiprocessing.Pipe(duplex=True)
5427            l = multiprocessing.connection.Listener(family='AF_INET')
5428            p = multiprocessing.Process(target=self._test_timeout,
5429                                        args=(child, l.address))
5430            p.start()
5431            child.close()
5432            self.assertEqual(parent.recv(), 123)
5433            parent.close()
5434            conn = l.accept()
5435            self.assertEqual(conn.recv(), 456)
5436            conn.close()
5437            l.close()
5438            join_process(p)
5439        finally:
5440            socket.setdefaulttimeout(old_timeout)
5441
5442#
5443# Test what happens with no "if __name__ == '__main__'"
5444#
5445
5446class TestNoForkBomb(unittest.TestCase):
5447    def test_noforkbomb(self):
5448        sm = multiprocessing.get_start_method()
5449        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
5450        if sm != 'fork':
5451            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
5452            self.assertEqual(out, b'')
5453            self.assertIn(b'RuntimeError', err)
5454        else:
5455            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
5456            self.assertEqual(out.rstrip(), b'123')
5457            self.assertEqual(err, b'')
5458
5459#
5460# Issue #17555: ForkAwareThreadLock
5461#
5462
5463class TestForkAwareThreadLock(unittest.TestCase):
5464    # We recursively start processes.  Issue #17555 meant that the
5465    # after fork registry would get duplicate entries for the same
5466    # lock.  The size of the registry at generation n was ~2**n.
5467
5468    @classmethod
5469    def child(cls, n, conn):
5470        if n > 1:
5471            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
5472            p.start()
5473            conn.close()
5474            join_process(p)
5475        else:
5476            conn.send(len(util._afterfork_registry))
5477        conn.close()
5478
5479    def test_lock(self):
5480        r, w = multiprocessing.Pipe(False)
5481        l = util.ForkAwareThreadLock()
5482        old_size = len(util._afterfork_registry)
5483        p = multiprocessing.Process(target=self.child, args=(5, w))
5484        p.start()
5485        w.close()
5486        new_size = r.recv()
5487        join_process(p)
5488        self.assertLessEqual(new_size, old_size)
5489
5490#
5491# Check that non-forked child processes do not inherit unneeded fds/handles
5492#
5493
5494class TestCloseFds(unittest.TestCase):
5495
5496    def get_high_socket_fd(self):
5497        if WIN32:
5498            # The child process will not have any socket handles, so
5499            # calling socket.fromfd() should produce WSAENOTSOCK even
5500            # if there is a handle of the same number.
5501            return socket.socket().detach()
5502        else:
5503            # We want to produce a socket with an fd high enough that a
5504            # freshly created child process will not have any fds as high.
5505            fd = socket.socket().detach()
5506            to_close = []
5507            while fd < 50:
5508                to_close.append(fd)
5509                fd = os.dup(fd)
5510            for x in to_close:
5511                os.close(x)
5512            return fd
5513
5514    def close(self, fd):
5515        if WIN32:
5516            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
5517        else:
5518            os.close(fd)
5519
5520    @classmethod
5521    def _test_closefds(cls, conn, fd):
5522        try:
5523            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
5524        except Exception as e:
5525            conn.send(e)
5526        else:
5527            s.close()
5528            conn.send(None)
5529
5530    def test_closefd(self):
5531        if not HAS_REDUCTION:
5532            raise unittest.SkipTest('requires fd pickling')
5533
5534        reader, writer = multiprocessing.Pipe()
5535        fd = self.get_high_socket_fd()
5536        try:
5537            p = multiprocessing.Process(target=self._test_closefds,
5538                                        args=(writer, fd))
5539            p.start()
5540            writer.close()
5541            e = reader.recv()
5542            join_process(p)
5543        finally:
5544            self.close(fd)
5545            writer.close()
5546            reader.close()
5547
5548        if multiprocessing.get_start_method() == 'fork':
5549            self.assertIs(e, None)
5550        else:
5551            WSAENOTSOCK = 10038
5552            self.assertIsInstance(e, OSError)
5553            self.assertTrue(e.errno == errno.EBADF or
5554                            e.winerror == WSAENOTSOCK, e)
5555
5556#
5557# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
5558#
5559
5560class TestIgnoreEINTR(unittest.TestCase):
5561
5562    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
5563    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
5564
5565    @classmethod
5566    def _test_ignore(cls, conn):
5567        def handler(signum, frame):
5568            pass
5569        signal.signal(signal.SIGUSR1, handler)
5570        conn.send('ready')
5571        x = conn.recv()
5572        conn.send(x)
5573        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
5574
5575    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5576    def test_ignore(self):
5577        conn, child_conn = multiprocessing.Pipe()
5578        try:
5579            p = multiprocessing.Process(target=self._test_ignore,
5580                                        args=(child_conn,))
5581            p.daemon = True
5582            p.start()
5583            child_conn.close()
5584            self.assertEqual(conn.recv(), 'ready')
5585            time.sleep(0.1)
5586            os.kill(p.pid, signal.SIGUSR1)
5587            time.sleep(0.1)
5588            conn.send(1234)
5589            self.assertEqual(conn.recv(), 1234)
5590            time.sleep(0.1)
5591            os.kill(p.pid, signal.SIGUSR1)
5592            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
5593            time.sleep(0.1)
5594            p.join()
5595        finally:
5596            conn.close()
5597
5598    @classmethod
5599    def _test_ignore_listener(cls, conn):
5600        def handler(signum, frame):
5601            pass
5602        signal.signal(signal.SIGUSR1, handler)
5603        with multiprocessing.connection.Listener() as l:
5604            conn.send(l.address)
5605            a = l.accept()
5606            a.send('welcome')
5607
5608    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5609    def test_ignore_listener(self):
5610        conn, child_conn = multiprocessing.Pipe()
5611        try:
5612            p = multiprocessing.Process(target=self._test_ignore_listener,
5613                                        args=(child_conn,))
5614            p.daemon = True
5615            p.start()
5616            child_conn.close()
5617            address = conn.recv()
5618            time.sleep(0.1)
5619            os.kill(p.pid, signal.SIGUSR1)
5620            time.sleep(0.1)
5621            client = multiprocessing.connection.Client(address)
5622            self.assertEqual(client.recv(), 'welcome')
5623            p.join()
5624        finally:
5625            conn.close()
5626
5627class TestStartMethod(unittest.TestCase):
5628    @classmethod
5629    def _check_context(cls, conn):
5630        conn.send(multiprocessing.get_start_method())
5631
5632    def check_context(self, ctx):
5633        r, w = ctx.Pipe(duplex=False)
5634        p = ctx.Process(target=self._check_context, args=(w,))
5635        p.start()
5636        w.close()
5637        child_method = r.recv()
5638        r.close()
5639        p.join()
5640        self.assertEqual(child_method, ctx.get_start_method())
5641
5642    def test_context(self):
5643        for method in ('fork', 'spawn', 'forkserver'):
5644            try:
5645                ctx = multiprocessing.get_context(method)
5646            except ValueError:
5647                continue
5648            self.assertEqual(ctx.get_start_method(), method)
5649            self.assertIs(ctx.get_context(), ctx)
5650            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
5651            self.assertRaises(ValueError, ctx.set_start_method, None)
5652            self.check_context(ctx)
5653
5654    def test_context_check_module_types(self):
5655        try:
5656            ctx = multiprocessing.get_context('forkserver')
5657        except ValueError:
5658            raise unittest.SkipTest('forkserver should be available')
5659        with self.assertRaisesRegex(TypeError, 'module_names must be a list of strings'):
5660            ctx.set_forkserver_preload([1, 2, 3])
5661
5662    def test_set_get(self):
5663        multiprocessing.set_forkserver_preload(PRELOAD)
5664        count = 0
5665        old_method = multiprocessing.get_start_method()
5666        try:
5667            for method in ('fork', 'spawn', 'forkserver'):
5668                try:
5669                    multiprocessing.set_start_method(method, force=True)
5670                except ValueError:
5671                    continue
5672                self.assertEqual(multiprocessing.get_start_method(), method)
5673                ctx = multiprocessing.get_context()
5674                self.assertEqual(ctx.get_start_method(), method)
5675                self.assertTrue(type(ctx).__name__.lower().startswith(method))
5676                self.assertTrue(
5677                    ctx.Process.__name__.lower().startswith(method))
5678                self.check_context(multiprocessing)
5679                count += 1
5680        finally:
5681            multiprocessing.set_start_method(old_method, force=True)
5682        self.assertGreaterEqual(count, 1)
5683
5684    def test_get_all(self):
5685        methods = multiprocessing.get_all_start_methods()
5686        if sys.platform == 'win32':
5687            self.assertEqual(methods, ['spawn'])
5688        else:
5689            self.assertTrue(methods == ['fork', 'spawn'] or
5690                            methods == ['spawn', 'fork'] or
5691                            methods == ['fork', 'spawn', 'forkserver'] or
5692                            methods == ['spawn', 'fork', 'forkserver'])
5693
5694    def test_preload_resources(self):
5695        if multiprocessing.get_start_method() != 'forkserver':
5696            self.skipTest("test only relevant for 'forkserver' method")
5697        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
5698        rc, out, err = test.support.script_helper.assert_python_ok(name)
5699        out = out.decode()
5700        err = err.decode()
5701        if out.rstrip() != 'ok' or err != '':
5702            print(out)
5703            print(err)
5704            self.fail("failed spawning forkserver or grandchild")
5705
5706    @unittest.skipIf(sys.platform == "win32",
5707                     "Only Spawn on windows so no risk of mixing")
5708    @only_run_in_spawn_testsuite("avoids redundant testing.")
5709    def test_mixed_startmethod(self):
5710        # Fork-based locks cannot be used with spawned process
5711        for process_method in ["spawn", "forkserver"]:
5712            queue = multiprocessing.get_context("fork").Queue()
5713            process_ctx = multiprocessing.get_context(process_method)
5714            p = process_ctx.Process(target=close_queue, args=(queue,))
5715            err_msg = "A SemLock created in a fork"
5716            with self.assertRaisesRegex(RuntimeError, err_msg):
5717                p.start()
5718
5719        # non-fork-based locks can be used with all other start methods
5720        for queue_method in ["spawn", "forkserver"]:
5721            for process_method in multiprocessing.get_all_start_methods():
5722                queue = multiprocessing.get_context(queue_method).Queue()
5723                process_ctx = multiprocessing.get_context(process_method)
5724                p = process_ctx.Process(target=close_queue, args=(queue,))
5725                p.start()
5726                p.join()
5727
5728    @classmethod
5729    def _put_one_in_queue(cls, queue):
5730        queue.put(1)
5731
5732    @classmethod
5733    def _put_two_and_nest_once(cls, queue):
5734        queue.put(2)
5735        process = multiprocessing.Process(target=cls._put_one_in_queue, args=(queue,))
5736        process.start()
5737        process.join()
5738
5739    def test_nested_startmethod(self):
5740        # gh-108520: Regression test to ensure that child process can send its
5741        # arguments to another process
5742        queue = multiprocessing.Queue()
5743
5744        process = multiprocessing.Process(target=self._put_two_and_nest_once, args=(queue,))
5745        process.start()
5746        process.join()
5747
5748        results = []
5749        while not queue.empty():
5750            results.append(queue.get())
5751
5752        # gh-109706: queue.put(1) can write into the queue before queue.put(2),
5753        # there is no synchronization in the test.
5754        self.assertSetEqual(set(results), set([2, 1]))
5755
5756
5757@unittest.skipIf(sys.platform == "win32",
5758                 "test semantics don't make sense on Windows")
5759class TestResourceTracker(unittest.TestCase):
5760
5761    def test_resource_tracker(self):
5762        #
5763        # Check that killing process does not leak named semaphores
5764        #
5765        cmd = '''if 1:
5766            import time, os
5767            import multiprocessing as mp
5768            from multiprocessing import resource_tracker
5769            from multiprocessing.shared_memory import SharedMemory
5770
5771            mp.set_start_method("spawn")
5772
5773
5774            def create_and_register_resource(rtype):
5775                if rtype == "semaphore":
5776                    lock = mp.Lock()
5777                    return lock, lock._semlock.name
5778                elif rtype == "shared_memory":
5779                    sm = SharedMemory(create=True, size=10)
5780                    return sm, sm._name
5781                else:
5782                    raise ValueError(
5783                        "Resource type {{}} not understood".format(rtype))
5784
5785
5786            resource1, rname1 = create_and_register_resource("{rtype}")
5787            resource2, rname2 = create_and_register_resource("{rtype}")
5788
5789            os.write({w}, rname1.encode("ascii") + b"\\n")
5790            os.write({w}, rname2.encode("ascii") + b"\\n")
5791
5792            time.sleep(10)
5793        '''
5794        for rtype in resource_tracker._CLEANUP_FUNCS:
5795            with self.subTest(rtype=rtype):
5796                if rtype in ("noop", "dummy"):
5797                    # Artefact resource type used by the resource_tracker
5798                    # or tests
5799                    continue
5800                r, w = os.pipe()
5801                p = subprocess.Popen([sys.executable,
5802                                     '-E', '-c', cmd.format(w=w, rtype=rtype)],
5803                                     pass_fds=[w],
5804                                     stderr=subprocess.PIPE)
5805                os.close(w)
5806                with open(r, 'rb', closefd=True) as f:
5807                    name1 = f.readline().rstrip().decode('ascii')
5808                    name2 = f.readline().rstrip().decode('ascii')
5809                _resource_unlink(name1, rtype)
5810                p.terminate()
5811                p.wait()
5812
5813                err_msg = (f"A {rtype} resource was leaked after a process was "
5814                           f"abruptly terminated")
5815                for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
5816                                                  err_msg):
5817                    try:
5818                        _resource_unlink(name2, rtype)
5819                    except OSError as e:
5820                        # docs say it should be ENOENT, but OSX seems to give
5821                        # EINVAL
5822                        self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5823                        break
5824
5825                err = p.stderr.read().decode('utf-8')
5826                p.stderr.close()
5827                expected = ('resource_tracker: There appear to be 2 leaked {} '
5828                            'objects'.format(
5829                            rtype))
5830                self.assertRegex(err, expected)
5831                self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
5832
5833    def check_resource_tracker_death(self, signum, should_die):
5834        # bpo-31310: if the semaphore tracker process has died, it should
5835        # be restarted implicitly.
5836        from multiprocessing.resource_tracker import _resource_tracker
5837        pid = _resource_tracker._pid
5838        if pid is not None:
5839            os.kill(pid, signal.SIGKILL)
5840            support.wait_process(pid, exitcode=-signal.SIGKILL)
5841        with warnings.catch_warnings():
5842            warnings.simplefilter("ignore")
5843            _resource_tracker.ensure_running()
5844        pid = _resource_tracker._pid
5845
5846        os.kill(pid, signum)
5847        time.sleep(1.0)  # give it time to die
5848
5849        ctx = multiprocessing.get_context("spawn")
5850        with warnings.catch_warnings(record=True) as all_warn:
5851            warnings.simplefilter("always")
5852            sem = ctx.Semaphore()
5853            sem.acquire()
5854            sem.release()
5855            wr = weakref.ref(sem)
5856            # ensure `sem` gets collected, which triggers communication with
5857            # the semaphore tracker
5858            del sem
5859            gc.collect()
5860            self.assertIsNone(wr())
5861            if should_die:
5862                self.assertEqual(len(all_warn), 1)
5863                the_warn = all_warn[0]
5864                self.assertTrue(issubclass(the_warn.category, UserWarning))
5865                self.assertTrue("resource_tracker: process died"
5866                                in str(the_warn.message))
5867            else:
5868                self.assertEqual(len(all_warn), 0)
5869
5870    def test_resource_tracker_sigint(self):
5871        # Catchable signal (ignored by semaphore tracker)
5872        self.check_resource_tracker_death(signal.SIGINT, False)
5873
5874    def test_resource_tracker_sigterm(self):
5875        # Catchable signal (ignored by semaphore tracker)
5876        self.check_resource_tracker_death(signal.SIGTERM, False)
5877
5878    @unittest.skipIf(sys.platform.startswith("netbsd"),
5879                     "gh-125620: Skip on NetBSD due to long wait for SIGKILL process termination.")
5880    def test_resource_tracker_sigkill(self):
5881        # Uncatchable signal.
5882        self.check_resource_tracker_death(signal.SIGKILL, True)
5883
5884    @staticmethod
5885    def _is_resource_tracker_reused(conn, pid):
5886        from multiprocessing.resource_tracker import _resource_tracker
5887        _resource_tracker.ensure_running()
5888        # The pid should be None in the child process, expect for the fork
5889        # context. It should not be a new value.
5890        reused = _resource_tracker._pid in (None, pid)
5891        reused &= _resource_tracker._check_alive()
5892        conn.send(reused)
5893
5894    def test_resource_tracker_reused(self):
5895        from multiprocessing.resource_tracker import _resource_tracker
5896        _resource_tracker.ensure_running()
5897        pid = _resource_tracker._pid
5898
5899        r, w = multiprocessing.Pipe(duplex=False)
5900        p = multiprocessing.Process(target=self._is_resource_tracker_reused,
5901                                    args=(w, pid))
5902        p.start()
5903        is_resource_tracker_reused = r.recv()
5904
5905        # Clean up
5906        p.join()
5907        w.close()
5908        r.close()
5909
5910        self.assertTrue(is_resource_tracker_reused)
5911
5912    def test_too_long_name_resource(self):
5913        # gh-96819: Resource names that will make the length of a write to a pipe
5914        # greater than PIPE_BUF are not allowed
5915        rtype = "shared_memory"
5916        too_long_name_resource = "a" * (512 - len(rtype))
5917        with self.assertRaises(ValueError):
5918            resource_tracker.register(too_long_name_resource, rtype)
5919
5920    def _test_resource_tracker_leak_resources(self, cleanup):
5921        # We use a separate instance for testing, since the main global
5922        # _resource_tracker may be used to watch test infrastructure.
5923        from multiprocessing.resource_tracker import ResourceTracker
5924        tracker = ResourceTracker()
5925        tracker.ensure_running()
5926        self.assertTrue(tracker._check_alive())
5927
5928        self.assertIsNone(tracker._exitcode)
5929        tracker.register('somename', 'dummy')
5930        if cleanup:
5931            tracker.unregister('somename', 'dummy')
5932            expected_exit_code = 0
5933        else:
5934            expected_exit_code = 1
5935
5936        self.assertTrue(tracker._check_alive())
5937        self.assertIsNone(tracker._exitcode)
5938        tracker._stop()
5939        self.assertEqual(tracker._exitcode, expected_exit_code)
5940
5941    def test_resource_tracker_exit_code(self):
5942        """
5943        Test the exit code of the resource tracker.
5944
5945        If no leaked resources were found, exit code should be 0, otherwise 1
5946        """
5947        for cleanup in [True, False]:
5948            with self.subTest(cleanup=cleanup):
5949                self._test_resource_tracker_leak_resources(
5950                    cleanup=cleanup,
5951                )
5952
5953class TestSimpleQueue(unittest.TestCase):
5954
5955    @classmethod
5956    def _test_empty(cls, queue, child_can_start, parent_can_continue):
5957        child_can_start.wait()
5958        # issue 30301, could fail under spawn and forkserver
5959        try:
5960            queue.put(queue.empty())
5961            queue.put(queue.empty())
5962        finally:
5963            parent_can_continue.set()
5964
5965    def test_empty_exceptions(self):
5966        # Assert that checking emptiness of a closed queue raises
5967        # an OSError, independently of whether the queue was used
5968        # or not. This differs from Queue and JoinableQueue.
5969        q = multiprocessing.SimpleQueue()
5970        q.close()  # close the pipe
5971        with self.assertRaisesRegex(OSError, 'is closed'):
5972            q.empty()
5973
5974    def test_empty(self):
5975        queue = multiprocessing.SimpleQueue()
5976        child_can_start = multiprocessing.Event()
5977        parent_can_continue = multiprocessing.Event()
5978
5979        proc = multiprocessing.Process(
5980            target=self._test_empty,
5981            args=(queue, child_can_start, parent_can_continue)
5982        )
5983        proc.daemon = True
5984        proc.start()
5985
5986        self.assertTrue(queue.empty())
5987
5988        child_can_start.set()
5989        parent_can_continue.wait()
5990
5991        self.assertFalse(queue.empty())
5992        self.assertEqual(queue.get(), True)
5993        self.assertEqual(queue.get(), False)
5994        self.assertTrue(queue.empty())
5995
5996        proc.join()
5997
5998    def test_close(self):
5999        queue = multiprocessing.SimpleQueue()
6000        queue.close()
6001        # closing a queue twice should not fail
6002        queue.close()
6003
6004    # Test specific to CPython since it tests private attributes
6005    @test.support.cpython_only
6006    def test_closed(self):
6007        queue = multiprocessing.SimpleQueue()
6008        queue.close()
6009        self.assertTrue(queue._reader.closed)
6010        self.assertTrue(queue._writer.closed)
6011
6012
6013class TestPoolNotLeakOnFailure(unittest.TestCase):
6014
6015    def test_release_unused_processes(self):
6016        # Issue #19675: During pool creation, if we can't create a process,
6017        # don't leak already created ones.
6018        will_fail_in = 3
6019        forked_processes = []
6020
6021        class FailingForkProcess:
6022            def __init__(self, **kwargs):
6023                self.name = 'Fake Process'
6024                self.exitcode = None
6025                self.state = None
6026                forked_processes.append(self)
6027
6028            def start(self):
6029                nonlocal will_fail_in
6030                if will_fail_in <= 0:
6031                    raise OSError("Manually induced OSError")
6032                will_fail_in -= 1
6033                self.state = 'started'
6034
6035            def terminate(self):
6036                self.state = 'stopping'
6037
6038            def join(self):
6039                if self.state == 'stopping':
6040                    self.state = 'stopped'
6041
6042            def is_alive(self):
6043                return self.state == 'started' or self.state == 'stopping'
6044
6045        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
6046            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
6047                Process=FailingForkProcess))
6048            p.close()
6049            p.join()
6050        self.assertFalse(
6051            any(process.is_alive() for process in forked_processes))
6052
6053
6054@hashlib_helper.requires_hashdigest('sha256')
6055class TestSyncManagerTypes(unittest.TestCase):
6056    """Test all the types which can be shared between a parent and a
6057    child process by using a manager which acts as an intermediary
6058    between them.
6059
6060    In the following unit-tests the base type is created in the parent
6061    process, the @classmethod represents the worker process and the
6062    shared object is readable and editable between the two.
6063
6064    # The child.
6065    @classmethod
6066    def _test_list(cls, obj):
6067        assert obj[0] == 5
6068        assert obj.append(6)
6069
6070    # The parent.
6071    def test_list(self):
6072        o = self.manager.list()
6073        o.append(5)
6074        self.run_worker(self._test_list, o)
6075        assert o[1] == 6
6076    """
6077    manager_class = multiprocessing.managers.SyncManager
6078
6079    def setUp(self):
6080        self.manager = self.manager_class()
6081        self.manager.start()
6082        self.proc = None
6083
6084    def tearDown(self):
6085        if self.proc is not None and self.proc.is_alive():
6086            self.proc.terminate()
6087            self.proc.join()
6088        self.manager.shutdown()
6089        self.manager = None
6090        self.proc = None
6091
6092    @classmethod
6093    def setUpClass(cls):
6094        support.reap_children()
6095
6096    tearDownClass = setUpClass
6097
6098    def wait_proc_exit(self):
6099        # Only the manager process should be returned by active_children()
6100        # but this can take a bit on slow machines, so wait a few seconds
6101        # if there are other children too (see #17395).
6102        join_process(self.proc)
6103
6104        timeout = WAIT_ACTIVE_CHILDREN_TIMEOUT
6105        start_time = time.monotonic()
6106        for _ in support.sleeping_retry(timeout, error=False):
6107            if len(multiprocessing.active_children()) <= 1:
6108                break
6109        else:
6110            dt = time.monotonic() - start_time
6111            support.environment_altered = True
6112            support.print_warning(f"multiprocessing.Manager still has "
6113                                  f"{multiprocessing.active_children()} "
6114                                  f"active children after {dt:.1f} seconds")
6115
6116    def run_worker(self, worker, obj):
6117        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
6118        self.proc.daemon = True
6119        self.proc.start()
6120        self.wait_proc_exit()
6121        self.assertEqual(self.proc.exitcode, 0)
6122
6123    @classmethod
6124    def _test_event(cls, obj):
6125        assert obj.is_set()
6126        obj.wait()
6127        obj.clear()
6128        obj.wait(0.001)
6129
6130    def test_event(self):
6131        o = self.manager.Event()
6132        o.set()
6133        self.run_worker(self._test_event, o)
6134        assert not o.is_set()
6135        o.wait(0.001)
6136
6137    @classmethod
6138    def _test_lock(cls, obj):
6139        obj.acquire()
6140
6141    def test_lock(self, lname="Lock"):
6142        o = getattr(self.manager, lname)()
6143        self.run_worker(self._test_lock, o)
6144        o.release()
6145        self.assertRaises(RuntimeError, o.release)  # already released
6146
6147    @classmethod
6148    def _test_rlock(cls, obj):
6149        obj.acquire()
6150        obj.release()
6151
6152    def test_rlock(self, lname="Lock"):
6153        o = getattr(self.manager, lname)()
6154        self.run_worker(self._test_rlock, o)
6155
6156    @classmethod
6157    def _test_semaphore(cls, obj):
6158        obj.acquire()
6159
6160    def test_semaphore(self, sname="Semaphore"):
6161        o = getattr(self.manager, sname)()
6162        self.run_worker(self._test_semaphore, o)
6163        o.release()
6164
6165    def test_bounded_semaphore(self):
6166        self.test_semaphore(sname="BoundedSemaphore")
6167
6168    @classmethod
6169    def _test_condition(cls, obj):
6170        obj.acquire()
6171        obj.release()
6172
6173    def test_condition(self):
6174        o = self.manager.Condition()
6175        self.run_worker(self._test_condition, o)
6176
6177    @classmethod
6178    def _test_barrier(cls, obj):
6179        assert obj.parties == 5
6180        obj.reset()
6181
6182    def test_barrier(self):
6183        o = self.manager.Barrier(5)
6184        self.run_worker(self._test_barrier, o)
6185
6186    @classmethod
6187    def _test_pool(cls, obj):
6188        # TODO: fix https://bugs.python.org/issue35919
6189        with obj:
6190            pass
6191
6192    def test_pool(self):
6193        o = self.manager.Pool(processes=4)
6194        self.run_worker(self._test_pool, o)
6195
6196    @classmethod
6197    def _test_queue(cls, obj):
6198        assert obj.qsize() == 2
6199        assert obj.full()
6200        assert not obj.empty()
6201        assert obj.get() == 5
6202        assert not obj.empty()
6203        assert obj.get() == 6
6204        assert obj.empty()
6205
6206    def test_queue(self, qname="Queue"):
6207        o = getattr(self.manager, qname)(2)
6208        o.put(5)
6209        o.put(6)
6210        self.run_worker(self._test_queue, o)
6211        assert o.empty()
6212        assert not o.full()
6213
6214    def test_joinable_queue(self):
6215        self.test_queue("JoinableQueue")
6216
6217    @classmethod
6218    def _test_list(cls, obj):
6219        case = unittest.TestCase()
6220        case.assertEqual(obj[0], 5)
6221        case.assertEqual(obj.count(5), 1)
6222        case.assertEqual(obj.index(5), 0)
6223        obj.sort()
6224        obj.reverse()
6225        for x in obj:
6226            pass
6227        case.assertEqual(len(obj), 1)
6228        case.assertEqual(obj.pop(0), 5)
6229
6230    def test_list(self):
6231        o = self.manager.list()
6232        o.append(5)
6233        self.run_worker(self._test_list, o)
6234        self.assertIsNotNone(o)
6235        self.assertEqual(len(o), 0)
6236
6237    @classmethod
6238    def _test_dict(cls, obj):
6239        case = unittest.TestCase()
6240        case.assertEqual(len(obj), 1)
6241        case.assertEqual(obj['foo'], 5)
6242        case.assertEqual(obj.get('foo'), 5)
6243        case.assertListEqual(list(obj.items()), [('foo', 5)])
6244        case.assertListEqual(list(obj.keys()), ['foo'])
6245        case.assertListEqual(list(obj.values()), [5])
6246        case.assertDictEqual(obj.copy(), {'foo': 5})
6247        case.assertTupleEqual(obj.popitem(), ('foo', 5))
6248
6249    def test_dict(self):
6250        o = self.manager.dict()
6251        o['foo'] = 5
6252        self.run_worker(self._test_dict, o)
6253        self.assertIsNotNone(o)
6254        self.assertEqual(len(o), 0)
6255
6256    @classmethod
6257    def _test_value(cls, obj):
6258        case = unittest.TestCase()
6259        case.assertEqual(obj.value, 1)
6260        case.assertEqual(obj.get(), 1)
6261        obj.set(2)
6262
6263    def test_value(self):
6264        o = self.manager.Value('i', 1)
6265        self.run_worker(self._test_value, o)
6266        self.assertEqual(o.value, 2)
6267        self.assertEqual(o.get(), 2)
6268
6269    @classmethod
6270    def _test_array(cls, obj):
6271        case = unittest.TestCase()
6272        case.assertEqual(obj[0], 0)
6273        case.assertEqual(obj[1], 1)
6274        case.assertEqual(len(obj), 2)
6275        case.assertListEqual(list(obj), [0, 1])
6276
6277    def test_array(self):
6278        o = self.manager.Array('i', [0, 1])
6279        self.run_worker(self._test_array, o)
6280
6281    @classmethod
6282    def _test_namespace(cls, obj):
6283        case = unittest.TestCase()
6284        case.assertEqual(obj.x, 0)
6285        case.assertEqual(obj.y, 1)
6286
6287    def test_namespace(self):
6288        o = self.manager.Namespace()
6289        o.x = 0
6290        o.y = 1
6291        self.run_worker(self._test_namespace, o)
6292
6293
6294class TestNamedResource(unittest.TestCase):
6295    @only_run_in_spawn_testsuite("spawn specific test.")
6296    def test_global_named_resource_spawn(self):
6297        #
6298        # gh-90549: Check that global named resources in main module
6299        # will not leak by a subprocess, in spawn context.
6300        #
6301        testfn = os_helper.TESTFN
6302        self.addCleanup(os_helper.unlink, testfn)
6303        with open(testfn, 'w', encoding='utf-8') as f:
6304            f.write(textwrap.dedent('''\
6305                import multiprocessing as mp
6306                ctx = mp.get_context('spawn')
6307                global_resource = ctx.Semaphore()
6308                def submain(): pass
6309                if __name__ == '__main__':
6310                    p = ctx.Process(target=submain)
6311                    p.start()
6312                    p.join()
6313            '''))
6314        rc, out, err = script_helper.assert_python_ok(testfn)
6315        # on error, err = 'UserWarning: resource_tracker: There appear to
6316        # be 1 leaked semaphore objects to clean up at shutdown'
6317        self.assertFalse(err, msg=err.decode('utf-8'))
6318
6319
6320class _TestAtExit(BaseTestCase):
6321
6322    ALLOWED_TYPES = ('processes',)
6323
6324    @classmethod
6325    def _write_file_at_exit(self, output_path):
6326        import atexit
6327        def exit_handler():
6328            with open(output_path, 'w') as f:
6329                f.write("deadbeef")
6330        atexit.register(exit_handler)
6331
6332    def test_atexit(self):
6333        # gh-83856
6334        with os_helper.temp_dir() as temp_dir:
6335            output_path = os.path.join(temp_dir, 'output.txt')
6336            p = self.Process(target=self._write_file_at_exit, args=(output_path,))
6337            p.start()
6338            p.join()
6339            with open(output_path) as f:
6340                self.assertEqual(f.read(), 'deadbeef')
6341
6342
6343class _TestSpawnedSysPath(BaseTestCase):
6344    """Test that sys.path is setup in forkserver and spawn processes."""
6345
6346    ALLOWED_TYPES = {'processes'}
6347    # Not applicable to fork which inherits everything from the process as is.
6348    START_METHODS = {"forkserver", "spawn"}
6349
6350    def setUp(self):
6351        self._orig_sys_path = list(sys.path)
6352        self._temp_dir = tempfile.mkdtemp(prefix="test_sys_path-")
6353        self._mod_name = "unique_test_mod"
6354        module_path = os.path.join(self._temp_dir, f"{self._mod_name}.py")
6355        with open(module_path, "w", encoding="utf-8") as mod:
6356            mod.write("# A simple test module\n")
6357        sys.path[:] = [p for p in sys.path if p]  # remove any existing ""s
6358        sys.path.insert(0, self._temp_dir)
6359        sys.path.insert(0, "")  # Replaced with an abspath in child.
6360        self.assertIn(self.start_method, self.START_METHODS)
6361        self._ctx = multiprocessing.get_context(self.start_method)
6362
6363    def tearDown(self):
6364        sys.path[:] = self._orig_sys_path
6365        shutil.rmtree(self._temp_dir, ignore_errors=True)
6366
6367    @staticmethod
6368    def enq_imported_module_names(queue):
6369        queue.put(tuple(sys.modules))
6370
6371    def test_forkserver_preload_imports_sys_path(self):
6372        if self._ctx.get_start_method() != "forkserver":
6373            self.skipTest("forkserver specific test.")
6374        self.assertNotIn(self._mod_name, sys.modules)
6375        multiprocessing.forkserver._forkserver._stop()  # Must be fresh.
6376        self._ctx.set_forkserver_preload(
6377            ["test.test_multiprocessing_forkserver", self._mod_name])
6378        q = self._ctx.Queue()
6379        proc = self._ctx.Process(
6380                target=self.enq_imported_module_names, args=(q,))
6381        proc.start()
6382        proc.join()
6383        child_imported_modules = q.get()
6384        q.close()
6385        self.assertIn(self._mod_name, child_imported_modules)
6386
6387    @staticmethod
6388    def enq_sys_path_and_import(queue, mod_name):
6389        queue.put(sys.path)
6390        try:
6391            importlib.import_module(mod_name)
6392        except ImportError as exc:
6393            queue.put(exc)
6394        else:
6395            queue.put(None)
6396
6397    def test_child_sys_path(self):
6398        q = self._ctx.Queue()
6399        proc = self._ctx.Process(
6400                target=self.enq_sys_path_and_import, args=(q, self._mod_name))
6401        proc.start()
6402        proc.join()
6403        child_sys_path = q.get()
6404        import_error = q.get()
6405        q.close()
6406        self.assertNotIn("", child_sys_path)  # replaced by an abspath
6407        self.assertIn(self._temp_dir, child_sys_path)  # our addition
6408        # ignore the first element, it is the absolute "" replacement
6409        self.assertEqual(child_sys_path[1:], sys.path[1:])
6410        self.assertIsNone(import_error, msg=f"child could not import {self._mod_name}")
6411
6412
6413class MiscTestCase(unittest.TestCase):
6414    def test__all__(self):
6415        # Just make sure names in not_exported are excluded
6416        support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
6417                             not_exported=['SUBDEBUG', 'SUBWARNING'])
6418
6419    @only_run_in_spawn_testsuite("avoids redundant testing.")
6420    def test_spawn_sys_executable_none_allows_import(self):
6421        # Regression test for a bug introduced in
6422        # https://github.com/python/cpython/issues/90876 that caused an
6423        # ImportError in multiprocessing when sys.executable was None.
6424        # This can be true in embedded environments.
6425        rc, out, err = script_helper.assert_python_ok(
6426            "-c",
6427            """if 1:
6428            import sys
6429            sys.executable = None
6430            assert "multiprocessing" not in sys.modules, "already imported!"
6431            import multiprocessing
6432            import multiprocessing.spawn  # This should not fail\n""",
6433        )
6434        self.assertEqual(rc, 0)
6435        self.assertFalse(err, msg=err.decode('utf-8'))
6436
6437    def test_large_pool(self):
6438        #
6439        # gh-89240: Check that large pools are always okay
6440        #
6441        testfn = os_helper.TESTFN
6442        self.addCleanup(os_helper.unlink, testfn)
6443        with open(testfn, 'w', encoding='utf-8') as f:
6444            f.write(textwrap.dedent('''\
6445                import multiprocessing
6446                def f(x): return x*x
6447                if __name__ == '__main__':
6448                    with multiprocessing.Pool(200) as p:
6449                        print(sum(p.map(f, range(1000))))
6450            '''))
6451        rc, out, err = script_helper.assert_python_ok(testfn)
6452        self.assertEqual("332833500", out.decode('utf-8').strip())
6453        self.assertFalse(err, msg=err.decode('utf-8'))
6454
6455
6456#
6457# Mixins
6458#
6459
6460class BaseMixin(object):
6461    @classmethod
6462    def setUpClass(cls):
6463        cls.dangling = (multiprocessing.process._dangling.copy(),
6464                        threading._dangling.copy())
6465
6466    @classmethod
6467    def tearDownClass(cls):
6468        # bpo-26762: Some multiprocessing objects like Pool create reference
6469        # cycles. Trigger a garbage collection to break these cycles.
6470        test.support.gc_collect()
6471
6472        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
6473        if processes:
6474            test.support.environment_altered = True
6475            support.print_warning(f'Dangling processes: {processes}')
6476        processes = None
6477
6478        threads = set(threading._dangling) - set(cls.dangling[1])
6479        if threads:
6480            test.support.environment_altered = True
6481            support.print_warning(f'Dangling threads: {threads}')
6482        threads = None
6483
6484
6485class ProcessesMixin(BaseMixin):
6486    TYPE = 'processes'
6487    Process = multiprocessing.Process
6488    connection = multiprocessing.connection
6489    current_process = staticmethod(multiprocessing.current_process)
6490    parent_process = staticmethod(multiprocessing.parent_process)
6491    active_children = staticmethod(multiprocessing.active_children)
6492    set_executable = staticmethod(multiprocessing.set_executable)
6493    Pool = staticmethod(multiprocessing.Pool)
6494    Pipe = staticmethod(multiprocessing.Pipe)
6495    Queue = staticmethod(multiprocessing.Queue)
6496    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
6497    Lock = staticmethod(multiprocessing.Lock)
6498    RLock = staticmethod(multiprocessing.RLock)
6499    Semaphore = staticmethod(multiprocessing.Semaphore)
6500    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
6501    Condition = staticmethod(multiprocessing.Condition)
6502    Event = staticmethod(multiprocessing.Event)
6503    Barrier = staticmethod(multiprocessing.Barrier)
6504    Value = staticmethod(multiprocessing.Value)
6505    Array = staticmethod(multiprocessing.Array)
6506    RawValue = staticmethod(multiprocessing.RawValue)
6507    RawArray = staticmethod(multiprocessing.RawArray)
6508
6509
6510class ManagerMixin(BaseMixin):
6511    TYPE = 'manager'
6512    Process = multiprocessing.Process
6513    Queue = property(operator.attrgetter('manager.Queue'))
6514    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
6515    Lock = property(operator.attrgetter('manager.Lock'))
6516    RLock = property(operator.attrgetter('manager.RLock'))
6517    Semaphore = property(operator.attrgetter('manager.Semaphore'))
6518    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
6519    Condition = property(operator.attrgetter('manager.Condition'))
6520    Event = property(operator.attrgetter('manager.Event'))
6521    Barrier = property(operator.attrgetter('manager.Barrier'))
6522    Value = property(operator.attrgetter('manager.Value'))
6523    Array = property(operator.attrgetter('manager.Array'))
6524    list = property(operator.attrgetter('manager.list'))
6525    dict = property(operator.attrgetter('manager.dict'))
6526    Namespace = property(operator.attrgetter('manager.Namespace'))
6527
6528    @classmethod
6529    def Pool(cls, *args, **kwds):
6530        return cls.manager.Pool(*args, **kwds)
6531
6532    @classmethod
6533    def setUpClass(cls):
6534        super().setUpClass()
6535        cls.manager = multiprocessing.Manager()
6536
6537    @classmethod
6538    def tearDownClass(cls):
6539        # only the manager process should be returned by active_children()
6540        # but this can take a bit on slow machines, so wait a few seconds
6541        # if there are other children too (see #17395)
6542        timeout = WAIT_ACTIVE_CHILDREN_TIMEOUT
6543        start_time = time.monotonic()
6544        for _ in support.sleeping_retry(timeout, error=False):
6545            if len(multiprocessing.active_children()) <= 1:
6546                break
6547        else:
6548            dt = time.monotonic() - start_time
6549            support.environment_altered = True
6550            support.print_warning(f"multiprocessing.Manager still has "
6551                                  f"{multiprocessing.active_children()} "
6552                                  f"active children after {dt:.1f} seconds")
6553
6554        gc.collect()                       # do garbage collection
6555        if cls.manager._number_of_objects() != 0:
6556            # This is not really an error since some tests do not
6557            # ensure that all processes which hold a reference to a
6558            # managed object have been joined.
6559            test.support.environment_altered = True
6560            support.print_warning('Shared objects which still exist '
6561                                  'at manager shutdown:')
6562            support.print_warning(cls.manager._debug_info())
6563        cls.manager.shutdown()
6564        cls.manager.join()
6565        cls.manager = None
6566
6567        super().tearDownClass()
6568
6569
6570class ThreadsMixin(BaseMixin):
6571    TYPE = 'threads'
6572    Process = multiprocessing.dummy.Process
6573    connection = multiprocessing.dummy.connection
6574    current_process = staticmethod(multiprocessing.dummy.current_process)
6575    active_children = staticmethod(multiprocessing.dummy.active_children)
6576    Pool = staticmethod(multiprocessing.dummy.Pool)
6577    Pipe = staticmethod(multiprocessing.dummy.Pipe)
6578    Queue = staticmethod(multiprocessing.dummy.Queue)
6579    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
6580    Lock = staticmethod(multiprocessing.dummy.Lock)
6581    RLock = staticmethod(multiprocessing.dummy.RLock)
6582    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
6583    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
6584    Condition = staticmethod(multiprocessing.dummy.Condition)
6585    Event = staticmethod(multiprocessing.dummy.Event)
6586    Barrier = staticmethod(multiprocessing.dummy.Barrier)
6587    Value = staticmethod(multiprocessing.dummy.Value)
6588    Array = staticmethod(multiprocessing.dummy.Array)
6589
6590#
6591# Functions used to create test cases from the base ones in this module
6592#
6593
6594def install_tests_in_module_dict(remote_globs, start_method,
6595                                 only_type=None, exclude_types=False):
6596    __module__ = remote_globs['__name__']
6597    local_globs = globals()
6598    ALL_TYPES = {'processes', 'threads', 'manager'}
6599
6600    for name, base in local_globs.items():
6601        if not isinstance(base, type):
6602            continue
6603        if issubclass(base, BaseTestCase):
6604            if base is BaseTestCase:
6605                continue
6606            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
6607            if base.START_METHODS and start_method not in base.START_METHODS:
6608                continue  # class not intended for this start method.
6609            for type_ in base.ALLOWED_TYPES:
6610                if only_type and type_ != only_type:
6611                    continue
6612                if exclude_types:
6613                    continue
6614                newname = 'With' + type_.capitalize() + name[1:]
6615                Mixin = local_globs[type_.capitalize() + 'Mixin']
6616                class Temp(base, Mixin, unittest.TestCase):
6617                    pass
6618                if type_ == 'manager':
6619                    Temp = hashlib_helper.requires_hashdigest('sha256')(Temp)
6620                Temp.__name__ = Temp.__qualname__ = newname
6621                Temp.__module__ = __module__
6622                Temp.start_method = start_method
6623                remote_globs[newname] = Temp
6624        elif issubclass(base, unittest.TestCase):
6625            if only_type:
6626                continue
6627
6628            class Temp(base, object):
6629                pass
6630            Temp.__name__ = Temp.__qualname__ = name
6631            Temp.__module__ = __module__
6632            remote_globs[name] = Temp
6633
6634    dangling = [None, None]
6635    old_start_method = [None]
6636
6637    def setUpModule():
6638        multiprocessing.set_forkserver_preload(PRELOAD)
6639        multiprocessing.process._cleanup()
6640        dangling[0] = multiprocessing.process._dangling.copy()
6641        dangling[1] = threading._dangling.copy()
6642        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
6643        try:
6644            multiprocessing.set_start_method(start_method, force=True)
6645        except ValueError:
6646            raise unittest.SkipTest(start_method +
6647                                    ' start method not supported')
6648
6649        if sys.platform.startswith("linux"):
6650            try:
6651                lock = multiprocessing.RLock()
6652            except OSError:
6653                raise unittest.SkipTest("OSError raises on RLock creation, "
6654                                        "see issue 3111!")
6655        check_enough_semaphores()
6656        util.get_temp_dir()     # creates temp directory
6657        multiprocessing.get_logger().setLevel(LOG_LEVEL)
6658
6659    def tearDownModule():
6660        need_sleep = False
6661
6662        # bpo-26762: Some multiprocessing objects like Pool create reference
6663        # cycles. Trigger a garbage collection to break these cycles.
6664        test.support.gc_collect()
6665
6666        multiprocessing.set_start_method(old_start_method[0], force=True)
6667        # pause a bit so we don't get warning about dangling threads/processes
6668        processes = set(multiprocessing.process._dangling) - set(dangling[0])
6669        if processes:
6670            need_sleep = True
6671            test.support.environment_altered = True
6672            support.print_warning(f'Dangling processes: {processes}')
6673        processes = None
6674
6675        threads = set(threading._dangling) - set(dangling[1])
6676        if threads:
6677            need_sleep = True
6678            test.support.environment_altered = True
6679            support.print_warning(f'Dangling threads: {threads}')
6680        threads = None
6681
6682        # Sleep 500 ms to give time to child processes to complete.
6683        if need_sleep:
6684            time.sleep(0.5)
6685
6686        multiprocessing.util._cleanup_tests()
6687
6688    remote_globs['setUpModule'] = setUpModule
6689    remote_globs['tearDownModule'] = tearDownModule
6690
6691
6692@unittest.skipIf(not hasattr(_multiprocessing, 'SemLock'), 'SemLock not available')
6693@unittest.skipIf(sys.platform != "linux", "Linux only")
6694class SemLockTests(unittest.TestCase):
6695
6696    def test_semlock_subclass(self):
6697        class SemLock(_multiprocessing.SemLock):
6698            pass
6699        name = f'test_semlock_subclass-{os.getpid()}'
6700        s = SemLock(1, 0, 10, name, False)
6701        _multiprocessing.sem_unlink(name)
6702