• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4from collections import OrderedDict
5import os
6import pickle
7import random
8import re
9import subprocess
10import sys
11import textwrap
12import threading
13import time
14import unittest
15import weakref
16import importlib.machinery
17import importlib.util
18from test import support
19from test.support import MISSING_C_DOCSTRINGS
20from test.support.script_helper import assert_python_failure, assert_python_ok
21try:
22    import _posixsubprocess
23except ImportError:
24    _posixsubprocess = None
25
26# Skip this test if the _testcapi module isn't available.
27_testcapi = support.import_module('_testcapi')
28
29import _testinternalcapi
30
31# Were we compiled --with-pydebug or with #define Py_DEBUG?
32Py_DEBUG = hasattr(sys, 'gettotalrefcount')
33
34
35def testfunction(self):
36    """some doc"""
37    return self
38
39
40class InstanceMethod:
41    id = _testcapi.instancemethod(id)
42    testfunction = _testcapi.instancemethod(testfunction)
43
44class CAPITest(unittest.TestCase):
45
46    def test_instancemethod(self):
47        inst = InstanceMethod()
48        self.assertEqual(id(inst), inst.id())
49        self.assertTrue(inst.testfunction() is inst)
50        self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
51        self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
52
53        InstanceMethod.testfunction.attribute = "test"
54        self.assertEqual(testfunction.attribute, "test")
55        self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
56
57    def test_no_FatalError_infinite_loop(self):
58        with support.SuppressCrashReport():
59            p = subprocess.Popen([sys.executable, "-c",
60                                  'import _testcapi;'
61                                  '_testcapi.crash_no_current_thread()'],
62                                 stdout=subprocess.PIPE,
63                                 stderr=subprocess.PIPE)
64        (out, err) = p.communicate()
65        self.assertEqual(out, b'')
66        # This used to cause an infinite loop.
67        self.assertTrue(err.rstrip().startswith(
68                         b'Fatal Python error: '
69                         b'PyThreadState_Get: '
70                         b'the function must be called with the GIL held, '
71                         b'but the GIL is released '
72                         b'(the current Python thread state is NULL)'),
73                        err)
74
75    def test_memoryview_from_NULL_pointer(self):
76        self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
77
78    def test_exc_info(self):
79        raised_exception = ValueError("5")
80        new_exc = TypeError("TEST")
81        try:
82            raise raised_exception
83        except ValueError as e:
84            tb = e.__traceback__
85            orig_sys_exc_info = sys.exc_info()
86            orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
87            new_sys_exc_info = sys.exc_info()
88            new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
89            reset_sys_exc_info = sys.exc_info()
90
91            self.assertEqual(orig_exc_info[1], e)
92
93            self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
94            self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
95            self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
96            self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
97            self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
98        else:
99            self.assertTrue(False)
100
101    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
102    def test_seq_bytes_to_charp_array(self):
103        # Issue #15732: crash in _PySequence_BytesToCharpArray()
104        class Z(object):
105            def __len__(self):
106                return 1
107        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
108                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
109        # Issue #15736: overflow in _PySequence_BytesToCharpArray()
110        class Z(object):
111            def __len__(self):
112                return sys.maxsize
113            def __getitem__(self, i):
114                return b'x'
115        self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
116                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
117
118    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
119    def test_subprocess_fork_exec(self):
120        class Z(object):
121            def __len__(self):
122                return 1
123
124        # Issue #15738: crash in subprocess_fork_exec()
125        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
126                          Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
127
128    @unittest.skipIf(MISSING_C_DOCSTRINGS,
129                     "Signature information for builtins requires docstrings")
130    def test_docstring_signature_parsing(self):
131
132        self.assertEqual(_testcapi.no_docstring.__doc__, None)
133        self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
134
135        self.assertEqual(_testcapi.docstring_empty.__doc__, None)
136        self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
137
138        self.assertEqual(_testcapi.docstring_no_signature.__doc__,
139            "This docstring has no signature.")
140        self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
141
142        self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
143            "docstring_with_invalid_signature($module, /, boo)\n"
144            "\n"
145            "This docstring has an invalid signature."
146            )
147        self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
148
149        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
150            "docstring_with_invalid_signature2($module, /, boo)\n"
151            "\n"
152            "--\n"
153            "\n"
154            "This docstring also has an invalid signature."
155            )
156        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
157
158        self.assertEqual(_testcapi.docstring_with_signature.__doc__,
159            "This docstring has a valid signature.")
160        self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
161
162        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
163        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
164            "($module, /, sig)")
165
166        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
167            "\nThis docstring has a valid signature and some extra newlines.")
168        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
169            "($module, /, parameter)")
170
171    def test_c_type_with_matrix_multiplication(self):
172        M = _testcapi.matmulType
173        m1 = M()
174        m2 = M()
175        self.assertEqual(m1 @ m2, ("matmul", m1, m2))
176        self.assertEqual(m1 @ 42, ("matmul", m1, 42))
177        self.assertEqual(42 @ m1, ("matmul", 42, m1))
178        o = m1
179        o @= m2
180        self.assertEqual(o, ("imatmul", m1, m2))
181        o = m1
182        o @= 42
183        self.assertEqual(o, ("imatmul", m1, 42))
184        o = 42
185        o @= m1
186        self.assertEqual(o, ("matmul", 42, m1))
187
188    def test_c_type_with_ipow(self):
189        # When the __ipow__ method of a type was implemented in C, using the
190        # modulo param would cause segfaults.
191        o = _testcapi.ipowType()
192        self.assertEqual(o.__ipow__(1), (1, None))
193        self.assertEqual(o.__ipow__(2, 2), (2, 2))
194
195    def test_return_null_without_error(self):
196        # Issue #23571: A function must not return NULL without setting an
197        # error
198        if Py_DEBUG:
199            code = textwrap.dedent("""
200                import _testcapi
201                from test import support
202
203                with support.SuppressCrashReport():
204                    _testcapi.return_null_without_error()
205            """)
206            rc, out, err = assert_python_failure('-c', code)
207            self.assertRegex(err.replace(b'\r', b''),
208                             br'Fatal Python error: _Py_CheckFunctionResult: '
209                                br'a function returned NULL '
210                                br'without setting an error\n'
211                             br'Python runtime state: initialized\n'
212                             br'SystemError: <built-in function '
213                                 br'return_null_without_error> returned NULL '
214                                 br'without setting an error\n'
215                             br'\n'
216                             br'Current thread.*:\n'
217                             br'  File .*", line 6 in <module>')
218        else:
219            with self.assertRaises(SystemError) as cm:
220                _testcapi.return_null_without_error()
221            self.assertRegex(str(cm.exception),
222                             'return_null_without_error.* '
223                             'returned NULL without setting an error')
224
225    def test_return_result_with_error(self):
226        # Issue #23571: A function must not return a result with an error set
227        if Py_DEBUG:
228            code = textwrap.dedent("""
229                import _testcapi
230                from test import support
231
232                with support.SuppressCrashReport():
233                    _testcapi.return_result_with_error()
234            """)
235            rc, out, err = assert_python_failure('-c', code)
236            self.assertRegex(err.replace(b'\r', b''),
237                             br'Fatal Python error: _Py_CheckFunctionResult: '
238                                 br'a function returned a result '
239                                 br'with an error set\n'
240                             br'Python runtime state: initialized\n'
241                             br'ValueError\n'
242                             br'\n'
243                             br'The above exception was the direct cause '
244                                br'of the following exception:\n'
245                             br'\n'
246                             br'SystemError: <built-in '
247                                br'function return_result_with_error> '
248                                br'returned a result with an error set\n'
249                             br'\n'
250                             br'Current thread.*:\n'
251                             br'  File .*, line 6 in <module>')
252        else:
253            with self.assertRaises(SystemError) as cm:
254                _testcapi.return_result_with_error()
255            self.assertRegex(str(cm.exception),
256                             'return_result_with_error.* '
257                             'returned a result with an error set')
258
259    def test_buildvalue_N(self):
260        _testcapi.test_buildvalue_N()
261
262    def test_set_nomemory(self):
263        code = """if 1:
264            import _testcapi
265
266            class C(): pass
267
268            # The first loop tests both functions and that remove_mem_hooks()
269            # can be called twice in a row. The second loop checks a call to
270            # set_nomemory() after a call to remove_mem_hooks(). The third
271            # loop checks the start and stop arguments of set_nomemory().
272            for outer_cnt in range(1, 4):
273                start = 10 * outer_cnt
274                for j in range(100):
275                    if j == 0:
276                        if outer_cnt != 3:
277                            _testcapi.set_nomemory(start)
278                        else:
279                            _testcapi.set_nomemory(start, start + 1)
280                    try:
281                        C()
282                    except MemoryError as e:
283                        if outer_cnt != 3:
284                            _testcapi.remove_mem_hooks()
285                        print('MemoryError', outer_cnt, j)
286                        _testcapi.remove_mem_hooks()
287                        break
288        """
289        rc, out, err = assert_python_ok('-c', code)
290        self.assertIn(b'MemoryError 1 10', out)
291        self.assertIn(b'MemoryError 2 20', out)
292        self.assertIn(b'MemoryError 3 30', out)
293
294    def test_mapping_keys_values_items(self):
295        class Mapping1(dict):
296            def keys(self):
297                return list(super().keys())
298            def values(self):
299                return list(super().values())
300            def items(self):
301                return list(super().items())
302        class Mapping2(dict):
303            def keys(self):
304                return tuple(super().keys())
305            def values(self):
306                return tuple(super().values())
307            def items(self):
308                return tuple(super().items())
309        dict_obj = {'foo': 1, 'bar': 2, 'spam': 3}
310
311        for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(),
312                        dict_obj, OrderedDict(dict_obj),
313                        Mapping1(dict_obj), Mapping2(dict_obj)]:
314            self.assertListEqual(_testcapi.get_mapping_keys(mapping),
315                                 list(mapping.keys()))
316            self.assertListEqual(_testcapi.get_mapping_values(mapping),
317                                 list(mapping.values()))
318            self.assertListEqual(_testcapi.get_mapping_items(mapping),
319                                 list(mapping.items()))
320
321    def test_mapping_keys_values_items_bad_arg(self):
322        self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None)
323        self.assertRaises(AttributeError, _testcapi.get_mapping_values, None)
324        self.assertRaises(AttributeError, _testcapi.get_mapping_items, None)
325
326        class BadMapping:
327            def keys(self):
328                return None
329            def values(self):
330                return None
331            def items(self):
332                return None
333        bad_mapping = BadMapping()
334        self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping)
335        self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping)
336        self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping)
337
338    @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'),
339                         'need _testcapi.negative_refcount')
340    def test_negative_refcount(self):
341        # bpo-35059: Check that Py_DECREF() reports the correct filename
342        # when calling _Py_NegativeRefcount() to abort Python.
343        code = textwrap.dedent("""
344            import _testcapi
345            from test import support
346
347            with support.SuppressCrashReport():
348                _testcapi.negative_refcount()
349        """)
350        rc, out, err = assert_python_failure('-c', code)
351        self.assertRegex(err,
352                         br'_testcapimodule\.c:[0-9]+: '
353                         br'_Py_NegativeRefcount: Assertion failed: '
354                         br'object has negative ref count')
355
356    def test_trashcan_subclass(self):
357        # bpo-35983: Check that the trashcan mechanism for "list" is NOT
358        # activated when its tp_dealloc is being called by a subclass
359        from _testcapi import MyList
360        L = None
361        for i in range(1000):
362            L = MyList((L,))
363
364    @support.requires_resource('cpu')
365    def test_trashcan_python_class1(self):
366        self.do_test_trashcan_python_class(list)
367
368    @support.requires_resource('cpu')
369    def test_trashcan_python_class2(self):
370        from _testcapi import MyList
371        self.do_test_trashcan_python_class(MyList)
372
373    def do_test_trashcan_python_class(self, base):
374        # Check that the trashcan mechanism works properly for a Python
375        # subclass of a class using the trashcan (this specific test assumes
376        # that the base class "base" behaves like list)
377        class PyList(base):
378            # Count the number of PyList instances to verify that there is
379            # no memory leak
380            num = 0
381            def __init__(self, *args):
382                __class__.num += 1
383                super().__init__(*args)
384            def __del__(self):
385                __class__.num -= 1
386
387        for parity in (0, 1):
388            L = None
389            # We need in the order of 2**20 iterations here such that a
390            # typical 8MB stack would overflow without the trashcan.
391            for i in range(2**20):
392                L = PyList((L,))
393                L.attr = i
394            if parity:
395                # Add one additional nesting layer
396                L = (L,)
397            self.assertGreater(PyList.num, 0)
398            del L
399            self.assertEqual(PyList.num, 0)
400
401    def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self):
402        class HeapGcCTypeSubclass(_testcapi.HeapGcCType):
403            def __init__(self):
404                self.value2 = 20
405                super().__init__()
406
407        subclass_instance = HeapGcCTypeSubclass()
408        type_refcnt = sys.getrefcount(HeapGcCTypeSubclass)
409
410        # Test that subclass instance was fully created
411        self.assertEqual(subclass_instance.value, 10)
412        self.assertEqual(subclass_instance.value2, 20)
413
414        # Test that the type reference count is only decremented once
415        del subclass_instance
416        self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass))
417
418    def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
419        class A(_testcapi.HeapGcCType):
420            def __init__(self):
421                self.value2 = 20
422                super().__init__()
423
424        class B(A):
425            def __init__(self):
426                super().__init__()
427
428            def __del__(self):
429                self.__class__ = A
430                A.refcnt_in_del = sys.getrefcount(A)
431                B.refcnt_in_del = sys.getrefcount(B)
432
433        subclass_instance = B()
434        type_refcnt = sys.getrefcount(B)
435        new_type_refcnt = sys.getrefcount(A)
436
437        # Test that subclass instance was fully created
438        self.assertEqual(subclass_instance.value, 10)
439        self.assertEqual(subclass_instance.value2, 20)
440
441        del subclass_instance
442
443        # Test that setting __class__ modified the reference counts of the types
444        self.assertEqual(type_refcnt - 1, B.refcnt_in_del)
445        self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del)
446
447        # Test that the original type already has decreased its refcnt
448        self.assertEqual(type_refcnt - 1, sys.getrefcount(B))
449
450        # Test that subtype_dealloc decref the newly assigned __class__ only once
451        self.assertEqual(new_type_refcnt, sys.getrefcount(A))
452
453    def test_heaptype_with_dict(self):
454        inst = _testcapi.HeapCTypeWithDict()
455        inst.foo = 42
456        self.assertEqual(inst.foo, 42)
457        self.assertEqual(inst.dictobj, inst.__dict__)
458        self.assertEqual(inst.dictobj, {"foo": 42})
459
460        inst = _testcapi.HeapCTypeWithDict()
461        self.assertEqual({}, inst.__dict__)
462
463    def test_heaptype_with_negative_dict(self):
464        inst = _testcapi.HeapCTypeWithNegativeDict()
465        inst.foo = 42
466        self.assertEqual(inst.foo, 42)
467        self.assertEqual(inst.dictobj, inst.__dict__)
468        self.assertEqual(inst.dictobj, {"foo": 42})
469
470        inst = _testcapi.HeapCTypeWithNegativeDict()
471        self.assertEqual({}, inst.__dict__)
472
473    def test_heaptype_with_weakref(self):
474        inst = _testcapi.HeapCTypeWithWeakref()
475        ref = weakref.ref(inst)
476        self.assertEqual(ref(), inst)
477        self.assertEqual(inst.weakreflist, ref)
478
479    def test_heaptype_with_buffer(self):
480        inst = _testcapi.HeapCTypeWithBuffer()
481        b = bytes(inst)
482        self.assertEqual(b, b"1234")
483
484    def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self):
485        subclass_instance = _testcapi.HeapCTypeSubclass()
486        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
487
488        # Test that subclass instance was fully created
489        self.assertEqual(subclass_instance.value, 10)
490        self.assertEqual(subclass_instance.value2, 20)
491
492        # Test that the type reference count is only decremented once
493        del subclass_instance
494        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass))
495
496    def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
497        subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer()
498        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)
499        new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
500
501        # Test that subclass instance was fully created
502        self.assertEqual(subclass_instance.value, 10)
503        self.assertEqual(subclass_instance.value2, 20)
504
505        # The tp_finalize slot will set __class__ to HeapCTypeSubclass
506        del subclass_instance
507
508        # Test that setting __class__ modified the reference counts of the types
509        self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
510        self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del)
511
512        # Test that the original type already has decreased its refcnt
513        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer))
514
515        # Test that subtype_dealloc decref the newly assigned __class__ only once
516        self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass))
517
518    def test_heaptype_with_setattro(self):
519        obj = _testcapi.HeapCTypeSetattr()
520        self.assertEqual(obj.pvalue, 10)
521        obj.value = 12
522        self.assertEqual(obj.pvalue, 12)
523        del obj.value
524        self.assertEqual(obj.pvalue, 0)
525
526    def test_pynumber_tobase(self):
527        from _testcapi import pynumber_tobase
528        self.assertEqual(pynumber_tobase(123, 2), '0b1111011')
529        self.assertEqual(pynumber_tobase(123, 8), '0o173')
530        self.assertEqual(pynumber_tobase(123, 10), '123')
531        self.assertEqual(pynumber_tobase(123, 16), '0x7b')
532        self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011')
533        self.assertEqual(pynumber_tobase(-123, 8), '-0o173')
534        self.assertEqual(pynumber_tobase(-123, 10), '-123')
535        self.assertEqual(pynumber_tobase(-123, 16), '-0x7b')
536        self.assertRaises(TypeError, pynumber_tobase, 123.0, 10)
537        self.assertRaises(TypeError, pynumber_tobase, '123', 10)
538        self.assertRaises(SystemError, pynumber_tobase, 123, 0)
539
540
541class TestPendingCalls(unittest.TestCase):
542
543    def pendingcalls_submit(self, l, n):
544        def callback():
545            #this function can be interrupted by thread switching so let's
546            #use an atomic operation
547            l.append(None)
548
549        for i in range(n):
550            time.sleep(random.random()*0.02) #0.01 secs on average
551            #try submitting callback until successful.
552            #rely on regular interrupt to flush queue if we are
553            #unsuccessful.
554            while True:
555                if _testcapi._pending_threadfunc(callback):
556                    break;
557
558    def pendingcalls_wait(self, l, n, context = None):
559        #now, stick around until l[0] has grown to 10
560        count = 0;
561        while len(l) != n:
562            #this busy loop is where we expect to be interrupted to
563            #run our callbacks.  Note that callbacks are only run on the
564            #main thread
565            if False and support.verbose:
566                print("(%i)"%(len(l),),)
567            for i in range(1000):
568                a = i*i
569            if context and not context.event.is_set():
570                continue
571            count += 1
572            self.assertTrue(count < 10000,
573                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
574        if False and support.verbose:
575            print("(%i)"%(len(l),))
576
577    def test_pendingcalls_threaded(self):
578
579        #do every callback on a separate thread
580        n = 32 #total callbacks
581        threads = []
582        class foo(object):pass
583        context = foo()
584        context.l = []
585        context.n = 2 #submits per thread
586        context.nThreads = n // context.n
587        context.nFinished = 0
588        context.lock = threading.Lock()
589        context.event = threading.Event()
590
591        threads = [threading.Thread(target=self.pendingcalls_thread,
592                                    args=(context,))
593                   for i in range(context.nThreads)]
594        with support.start_threads(threads):
595            self.pendingcalls_wait(context.l, n, context)
596
597    def pendingcalls_thread(self, context):
598        try:
599            self.pendingcalls_submit(context.l, context.n)
600        finally:
601            with context.lock:
602                context.nFinished += 1
603                nFinished = context.nFinished
604                if False and support.verbose:
605                    print("finished threads: ", nFinished)
606            if nFinished == context.nThreads:
607                context.event.set()
608
609    def test_pendingcalls_non_threaded(self):
610        #again, just using the main thread, likely they will all be dispatched at
611        #once.  It is ok to ask for too many, because we loop until we find a slot.
612        #the loop can be interrupted to dispatch.
613        #there are only 32 dispatch slots, so we go for twice that!
614        l = []
615        n = 64
616        self.pendingcalls_submit(l, n)
617        self.pendingcalls_wait(l, n)
618
619
620class SubinterpreterTest(unittest.TestCase):
621
622    def test_subinterps(self):
623        import builtins
624        r, w = os.pipe()
625        code = """if 1:
626            import sys, builtins, pickle
627            with open({:d}, "wb") as f:
628                pickle.dump(id(sys.modules), f)
629                pickle.dump(id(builtins), f)
630            """.format(w)
631        with open(r, "rb") as f:
632            ret = support.run_in_subinterp(code)
633            self.assertEqual(ret, 0)
634            self.assertNotEqual(pickle.load(f), id(sys.modules))
635            self.assertNotEqual(pickle.load(f), id(builtins))
636
637    def test_subinterps_recent_language_features(self):
638        r, w = os.pipe()
639        code = """if 1:
640            import pickle
641            with open({:d}, "wb") as f:
642
643                @(lambda x:x)  # Py 3.9
644                def noop(x): return x
645
646                a = (b := f'1{{2}}3') + noop('x')  # Py 3.8 (:=) / 3.6 (f'')
647
648                async def foo(arg): return await arg  # Py 3.5
649
650                pickle.dump(dict(a=a, b=b), f)
651            """.format(w)
652
653        with open(r, "rb") as f:
654            ret = support.run_in_subinterp(code)
655            self.assertEqual(ret, 0)
656            self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'})
657
658    def test_mutate_exception(self):
659        """
660        Exceptions saved in global module state get shared between
661        individual module instances. This test checks whether or not
662        a change in one interpreter's module gets reflected into the
663        other ones.
664        """
665        import binascii
666
667        support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'")
668
669        self.assertFalse(hasattr(binascii.Error, "foobar"))
670
671
672class TestThreadState(unittest.TestCase):
673
674    @support.reap_threads
675    def test_thread_state(self):
676        # some extra thread-state tests driven via _testcapi
677        def target():
678            idents = []
679
680            def callback():
681                idents.append(threading.get_ident())
682
683            _testcapi._test_thread_state(callback)
684            a = b = callback
685            time.sleep(1)
686            # Check our main thread is in the list exactly 3 times.
687            self.assertEqual(idents.count(threading.get_ident()), 3,
688                             "Couldn't find main thread correctly in the list")
689
690        target()
691        t = threading.Thread(target=target)
692        t.start()
693        t.join()
694
695
696class Test_testcapi(unittest.TestCase):
697    locals().update((name, getattr(_testcapi, name))
698                    for name in dir(_testcapi)
699                    if name.startswith('test_') and not name.endswith('_code'))
700
701
702class Test_testinternalcapi(unittest.TestCase):
703    locals().update((name, getattr(_testinternalcapi, name))
704                    for name in dir(_testinternalcapi)
705                    if name.startswith('test_'))
706
707
708class PyMemDebugTests(unittest.TestCase):
709    PYTHONMALLOC = 'debug'
710    # '0x04c06e0' or '04C06E0'
711    PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+'
712
713    def check(self, code):
714        with support.SuppressCrashReport():
715            out = assert_python_failure('-c', code,
716                                        PYTHONMALLOC=self.PYTHONMALLOC)
717        stderr = out.err
718        return stderr.decode('ascii', 'replace')
719
720    def test_buffer_overflow(self):
721        out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()')
722        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
723                 r"    16 bytes originally requested\n"
724                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
725                 r"    The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n"
726                 r"        at tail\+0: 0x78 \*\*\* OUCH\n"
727                 r"        at tail\+1: 0xfd\n"
728                 r"        at tail\+2: 0xfd\n"
729                 r"        .*\n"
730                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
731                 r"    Data at p: cd cd cd .*\n"
732                 r"\n"
733                 r"Enable tracemalloc to get the memory block allocation traceback\n"
734                 r"\n"
735                 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte")
736        regex = regex.format(ptr=self.PTR_REGEX)
737        regex = re.compile(regex, flags=re.DOTALL)
738        self.assertRegex(out, regex)
739
740    def test_api_misuse(self):
741        out = self.check('import _testcapi; _testcapi.pymem_api_misuse()')
742        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
743                 r"    16 bytes originally requested\n"
744                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
745                 r"    The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n"
746                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
747                 r"    Data at p: cd cd cd .*\n"
748                 r"\n"
749                 r"Enable tracemalloc to get the memory block allocation traceback\n"
750                 r"\n"
751                 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n")
752        regex = regex.format(ptr=self.PTR_REGEX)
753        self.assertRegex(out, regex)
754
755    def check_malloc_without_gil(self, code):
756        out = self.check(code)
757        expected = ('Fatal Python error: _PyMem_DebugMalloc: '
758                    'Python memory allocator called without holding the GIL')
759        self.assertIn(expected, out)
760
761    def test_pymem_malloc_without_gil(self):
762        # Debug hooks must raise an error if PyMem_Malloc() is called
763        # without holding the GIL
764        code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()'
765        self.check_malloc_without_gil(code)
766
767    def test_pyobject_malloc_without_gil(self):
768        # Debug hooks must raise an error if PyObject_Malloc() is called
769        # without holding the GIL
770        code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()'
771        self.check_malloc_without_gil(code)
772
773    def check_pyobject_is_freed(self, func_name):
774        code = textwrap.dedent(f'''
775            import gc, os, sys, _testcapi
776            # Disable the GC to avoid crash on GC collection
777            gc.disable()
778            try:
779                _testcapi.{func_name}()
780                # Exit immediately to avoid a crash while deallocating
781                # the invalid object
782                os._exit(0)
783            except _testcapi.error:
784                os._exit(1)
785        ''')
786        assert_python_ok('-c', code, PYTHONMALLOC=self.PYTHONMALLOC)
787
788    def test_pyobject_null_is_freed(self):
789        self.check_pyobject_is_freed('check_pyobject_null_is_freed')
790
791    def test_pyobject_uninitialized_is_freed(self):
792        self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed')
793
794    def test_pyobject_forbidden_bytes_is_freed(self):
795        self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed')
796
797    def test_pyobject_freed_is_freed(self):
798        self.check_pyobject_is_freed('check_pyobject_freed_is_freed')
799
800
801class PyMemMallocDebugTests(PyMemDebugTests):
802    PYTHONMALLOC = 'malloc_debug'
803
804
805@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc')
806class PyMemPymallocDebugTests(PyMemDebugTests):
807    PYTHONMALLOC = 'pymalloc_debug'
808
809
810@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG')
811class PyMemDefaultTests(PyMemDebugTests):
812    # test default allocator of Python compiled in debug mode
813    PYTHONMALLOC = ''
814
815
816class Test_ModuleStateAccess(unittest.TestCase):
817    """Test access to module start (PEP 573)"""
818
819    # The C part of the tests lives in _testmultiphase, in a module called
820    # _testmultiphase_meth_state_access.
821    # This module has multi-phase initialization, unlike _testcapi.
822
823    def setUp(self):
824        fullname = '_testmultiphase_meth_state_access'  # XXX
825        origin = importlib.util.find_spec('_testmultiphase').origin
826        loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
827        spec = importlib.util.spec_from_loader(fullname, loader)
828        module = importlib.util.module_from_spec(spec)
829        loader.exec_module(module)
830        self.module = module
831
832    def test_subclass_get_module(self):
833        """PyType_GetModule for defining_class"""
834        class StateAccessType_Subclass(self.module.StateAccessType):
835            pass
836
837        instance = StateAccessType_Subclass()
838        self.assertIs(instance.get_defining_module(), self.module)
839
840    def test_subclass_get_module_with_super(self):
841        class StateAccessType_Subclass(self.module.StateAccessType):
842            def get_defining_module(self):
843                return super().get_defining_module()
844
845        instance = StateAccessType_Subclass()
846        self.assertIs(instance.get_defining_module(), self.module)
847
848    def test_state_access(self):
849        """Checks methods defined with and without argument clinic
850
851        This tests a no-arg method (get_count) and a method with
852        both a positional and keyword argument.
853        """
854
855        a = self.module.StateAccessType()
856        b = self.module.StateAccessType()
857
858        methods = {
859            'clinic': a.increment_count_clinic,
860            'noclinic': a.increment_count_noclinic,
861        }
862
863        for name, increment_count in methods.items():
864            with self.subTest(name):
865                self.assertEqual(a.get_count(), b.get_count())
866                self.assertEqual(a.get_count(), 0)
867
868                increment_count()
869                self.assertEqual(a.get_count(), b.get_count())
870                self.assertEqual(a.get_count(), 1)
871
872                increment_count(3)
873                self.assertEqual(a.get_count(), b.get_count())
874                self.assertEqual(a.get_count(), 4)
875
876                increment_count(-2, twice=True)
877                self.assertEqual(a.get_count(), b.get_count())
878                self.assertEqual(a.get_count(), 0)
879
880                with self.assertRaises(TypeError):
881                    increment_count(thrice=3)
882
883                with self.assertRaises(TypeError):
884                    increment_count(1, 2, 3)
885
886
887if __name__ == "__main__":
888    unittest.main()
889