• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4from collections import OrderedDict
5import importlib.machinery
6import importlib.util
7import os
8import pickle
9import random
10import re
11import subprocess
12import sys
13import textwrap
14import threading
15import time
16import unittest
17import weakref
18from test import support
19from test.support import MISSING_C_DOCSTRINGS
20from test.support import import_helper
21from test.support import threading_helper
22from test.support import warnings_helper
23from test.support.script_helper import assert_python_failure, assert_python_ok
24try:
25    import _posixsubprocess
26except ImportError:
27    _posixsubprocess = None
28
29# Skip this test if the _testcapi module isn't available.
30_testcapi = import_helper.import_module('_testcapi')
31
32import _testinternalcapi
33
34# Were we compiled --with-pydebug or with #define Py_DEBUG?
35Py_DEBUG = hasattr(sys, 'gettotalrefcount')
36
37
38def decode_stderr(err):
39    return err.decode('utf-8', 'replace').replace('\r', '')
40
41
42def testfunction(self):
43    """some doc"""
44    return self
45
46
47class InstanceMethod:
48    id = _testcapi.instancemethod(id)
49    testfunction = _testcapi.instancemethod(testfunction)
50
51class CAPITest(unittest.TestCase):
52
53    def test_instancemethod(self):
54        inst = InstanceMethod()
55        self.assertEqual(id(inst), inst.id())
56        self.assertTrue(inst.testfunction() is inst)
57        self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
58        self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
59
60        InstanceMethod.testfunction.attribute = "test"
61        self.assertEqual(testfunction.attribute, "test")
62        self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
63
64    def test_no_FatalError_infinite_loop(self):
65        with support.SuppressCrashReport():
66            p = subprocess.Popen([sys.executable, "-c",
67                                  'import _testcapi;'
68                                  '_testcapi.crash_no_current_thread()'],
69                                 stdout=subprocess.PIPE,
70                                 stderr=subprocess.PIPE)
71        (out, err) = p.communicate()
72        self.assertEqual(out, b'')
73        # This used to cause an infinite loop.
74        self.assertTrue(err.rstrip().startswith(
75                         b'Fatal Python error: '
76                         b'PyThreadState_Get: '
77                         b'the function must be called with the GIL held, '
78                         b'but the GIL is released '
79                         b'(the current Python thread state is NULL)'),
80                        err)
81
82    def test_memoryview_from_NULL_pointer(self):
83        self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
84
85    def test_exc_info(self):
86        raised_exception = ValueError("5")
87        new_exc = TypeError("TEST")
88        try:
89            raise raised_exception
90        except ValueError as e:
91            tb = e.__traceback__
92            orig_sys_exc_info = sys.exc_info()
93            orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
94            new_sys_exc_info = sys.exc_info()
95            new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
96            reset_sys_exc_info = sys.exc_info()
97
98            self.assertEqual(orig_exc_info[1], e)
99
100            self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
101            self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
102            self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
103            self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
104            self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
105        else:
106            self.assertTrue(False)
107
108    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
109    def test_seq_bytes_to_charp_array(self):
110        # Issue #15732: crash in _PySequence_BytesToCharpArray()
111        class Z(object):
112            def __len__(self):
113                return 1
114        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
115                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
116        # Issue #15736: overflow in _PySequence_BytesToCharpArray()
117        class Z(object):
118            def __len__(self):
119                return sys.maxsize
120            def __getitem__(self, i):
121                return b'x'
122        self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
123                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
124
125    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
126    def test_subprocess_fork_exec(self):
127        class Z(object):
128            def __len__(self):
129                return 1
130
131        # Issue #15738: crash in subprocess_fork_exec()
132        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
133                          Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
134
135    @unittest.skipIf(MISSING_C_DOCSTRINGS,
136                     "Signature information for builtins requires docstrings")
137    def test_docstring_signature_parsing(self):
138
139        self.assertEqual(_testcapi.no_docstring.__doc__, None)
140        self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
141
142        self.assertEqual(_testcapi.docstring_empty.__doc__, None)
143        self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
144
145        self.assertEqual(_testcapi.docstring_no_signature.__doc__,
146            "This docstring has no signature.")
147        self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
148
149        self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
150            "docstring_with_invalid_signature($module, /, boo)\n"
151            "\n"
152            "This docstring has an invalid signature."
153            )
154        self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
155
156        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
157            "docstring_with_invalid_signature2($module, /, boo)\n"
158            "\n"
159            "--\n"
160            "\n"
161            "This docstring also has an invalid signature."
162            )
163        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
164
165        self.assertEqual(_testcapi.docstring_with_signature.__doc__,
166            "This docstring has a valid signature.")
167        self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
168
169        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
170        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
171            "($module, /, sig)")
172
173        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
174            "\nThis docstring has a valid signature and some extra newlines.")
175        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
176            "($module, /, parameter)")
177
178    def test_c_type_with_matrix_multiplication(self):
179        M = _testcapi.matmulType
180        m1 = M()
181        m2 = M()
182        self.assertEqual(m1 @ m2, ("matmul", m1, m2))
183        self.assertEqual(m1 @ 42, ("matmul", m1, 42))
184        self.assertEqual(42 @ m1, ("matmul", 42, m1))
185        o = m1
186        o @= m2
187        self.assertEqual(o, ("imatmul", m1, m2))
188        o = m1
189        o @= 42
190        self.assertEqual(o, ("imatmul", m1, 42))
191        o = 42
192        o @= m1
193        self.assertEqual(o, ("matmul", 42, m1))
194
195    def test_c_type_with_ipow(self):
196        # When the __ipow__ method of a type was implemented in C, using the
197        # modulo param would cause segfaults.
198        o = _testcapi.ipowType()
199        self.assertEqual(o.__ipow__(1), (1, None))
200        self.assertEqual(o.__ipow__(2, 2), (2, 2))
201
202    def test_return_null_without_error(self):
203        # Issue #23571: A function must not return NULL without setting an
204        # error
205        if Py_DEBUG:
206            code = textwrap.dedent("""
207                import _testcapi
208                from test import support
209
210                with support.SuppressCrashReport():
211                    _testcapi.return_null_without_error()
212            """)
213            rc, out, err = assert_python_failure('-c', code)
214            err = decode_stderr(err)
215            self.assertRegex(err,
216                r'Fatal Python error: _Py_CheckFunctionResult: '
217                    r'a function returned NULL without setting an exception\n'
218                r'Python runtime state: initialized\n'
219                r'SystemError: <built-in function return_null_without_error> '
220                    r'returned NULL without setting an exception\n'
221                r'\n'
222                r'Current thread.*:\n'
223                r'  File .*", line 6 in <module>\n')
224        else:
225            with self.assertRaises(SystemError) as cm:
226                _testcapi.return_null_without_error()
227            self.assertRegex(str(cm.exception),
228                             'return_null_without_error.* '
229                             'returned NULL without setting an exception')
230
231    def test_return_result_with_error(self):
232        # Issue #23571: A function must not return a result with an error set
233        if Py_DEBUG:
234            code = textwrap.dedent("""
235                import _testcapi
236                from test import support
237
238                with support.SuppressCrashReport():
239                    _testcapi.return_result_with_error()
240            """)
241            rc, out, err = assert_python_failure('-c', code)
242            err = decode_stderr(err)
243            self.assertRegex(err,
244                    r'Fatal Python error: _Py_CheckFunctionResult: '
245                        r'a function returned a result with an exception set\n'
246                    r'Python runtime state: initialized\n'
247                    r'ValueError\n'
248                    r'\n'
249                    r'The above exception was the direct cause '
250                        r'of the following exception:\n'
251                    r'\n'
252                    r'SystemError: <built-in '
253                        r'function return_result_with_error> '
254                        r'returned a result with an exception set\n'
255                    r'\n'
256                    r'Current thread.*:\n'
257                    r'  File .*, line 6 in <module>\n')
258        else:
259            with self.assertRaises(SystemError) as cm:
260                _testcapi.return_result_with_error()
261            self.assertRegex(str(cm.exception),
262                             'return_result_with_error.* '
263                             'returned a result with an exception set')
264
265    def test_getitem_with_error(self):
266        # Test _Py_CheckSlotResult(). Raise an exception and then calls
267        # PyObject_GetItem(): check that the assertion catches the bug.
268        # PyObject_GetItem() must not be called with an exception set.
269        code = textwrap.dedent("""
270            import _testcapi
271            from test import support
272
273            with support.SuppressCrashReport():
274                _testcapi.getitem_with_error({1: 2}, 1)
275        """)
276        rc, out, err = assert_python_failure('-c', code)
277        err = decode_stderr(err)
278        if 'SystemError: ' not in err:
279            self.assertRegex(err,
280                    r'Fatal Python error: _Py_CheckSlotResult: '
281                        r'Slot __getitem__ of type dict succeeded '
282                        r'with an exception set\n'
283                    r'Python runtime state: initialized\n'
284                    r'ValueError: bug\n'
285                    r'\n'
286                    r'Current thread .* \(most recent call first\):\n'
287                    r'  File .*, line 6 in <module>\n'
288                    r'\n'
289                    r'Extension modules: _testcapi \(total: 1\)\n')
290        else:
291            # Python built with NDEBUG macro defined:
292            # test _Py_CheckFunctionResult() instead.
293            self.assertIn('returned a result with an exception set', err)
294
295    def test_buildvalue_N(self):
296        _testcapi.test_buildvalue_N()
297
298    def test_set_nomemory(self):
299        code = """if 1:
300            import _testcapi
301
302            class C(): pass
303
304            # The first loop tests both functions and that remove_mem_hooks()
305            # can be called twice in a row. The second loop checks a call to
306            # set_nomemory() after a call to remove_mem_hooks(). The third
307            # loop checks the start and stop arguments of set_nomemory().
308            for outer_cnt in range(1, 4):
309                start = 10 * outer_cnt
310                for j in range(100):
311                    if j == 0:
312                        if outer_cnt != 3:
313                            _testcapi.set_nomemory(start)
314                        else:
315                            _testcapi.set_nomemory(start, start + 1)
316                    try:
317                        C()
318                    except MemoryError as e:
319                        if outer_cnt != 3:
320                            _testcapi.remove_mem_hooks()
321                        print('MemoryError', outer_cnt, j)
322                        _testcapi.remove_mem_hooks()
323                        break
324        """
325        rc, out, err = assert_python_ok('-c', code)
326        self.assertIn(b'MemoryError 1 10', out)
327        self.assertIn(b'MemoryError 2 20', out)
328        self.assertIn(b'MemoryError 3 30', out)
329
330    def test_mapping_keys_values_items(self):
331        class Mapping1(dict):
332            def keys(self):
333                return list(super().keys())
334            def values(self):
335                return list(super().values())
336            def items(self):
337                return list(super().items())
338        class Mapping2(dict):
339            def keys(self):
340                return tuple(super().keys())
341            def values(self):
342                return tuple(super().values())
343            def items(self):
344                return tuple(super().items())
345        dict_obj = {'foo': 1, 'bar': 2, 'spam': 3}
346
347        for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(),
348                        dict_obj, OrderedDict(dict_obj),
349                        Mapping1(dict_obj), Mapping2(dict_obj)]:
350            self.assertListEqual(_testcapi.get_mapping_keys(mapping),
351                                 list(mapping.keys()))
352            self.assertListEqual(_testcapi.get_mapping_values(mapping),
353                                 list(mapping.values()))
354            self.assertListEqual(_testcapi.get_mapping_items(mapping),
355                                 list(mapping.items()))
356
357    def test_mapping_keys_values_items_bad_arg(self):
358        self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None)
359        self.assertRaises(AttributeError, _testcapi.get_mapping_values, None)
360        self.assertRaises(AttributeError, _testcapi.get_mapping_items, None)
361
362        class BadMapping:
363            def keys(self):
364                return None
365            def values(self):
366                return None
367            def items(self):
368                return None
369        bad_mapping = BadMapping()
370        self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping)
371        self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping)
372        self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping)
373
374    @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'),
375                         'need _testcapi.negative_refcount')
376    def test_negative_refcount(self):
377        # bpo-35059: Check that Py_DECREF() reports the correct filename
378        # when calling _Py_NegativeRefcount() to abort Python.
379        code = textwrap.dedent("""
380            import _testcapi
381            from test import support
382
383            with support.SuppressCrashReport():
384                _testcapi.negative_refcount()
385        """)
386        rc, out, err = assert_python_failure('-c', code)
387        self.assertRegex(err,
388                         br'_testcapimodule\.c:[0-9]+: '
389                         br'_Py_NegativeRefcount: Assertion failed: '
390                         br'object has negative ref count')
391
392    def test_trashcan_subclass(self):
393        # bpo-35983: Check that the trashcan mechanism for "list" is NOT
394        # activated when its tp_dealloc is being called by a subclass
395        from _testcapi import MyList
396        L = None
397        for i in range(1000):
398            L = MyList((L,))
399
400    @support.requires_resource('cpu')
401    def test_trashcan_python_class1(self):
402        self.do_test_trashcan_python_class(list)
403
404    @support.requires_resource('cpu')
405    def test_trashcan_python_class2(self):
406        from _testcapi import MyList
407        self.do_test_trashcan_python_class(MyList)
408
409    def do_test_trashcan_python_class(self, base):
410        # Check that the trashcan mechanism works properly for a Python
411        # subclass of a class using the trashcan (this specific test assumes
412        # that the base class "base" behaves like list)
413        class PyList(base):
414            # Count the number of PyList instances to verify that there is
415            # no memory leak
416            num = 0
417            def __init__(self, *args):
418                __class__.num += 1
419                super().__init__(*args)
420            def __del__(self):
421                __class__.num -= 1
422
423        for parity in (0, 1):
424            L = None
425            # We need in the order of 2**20 iterations here such that a
426            # typical 8MB stack would overflow without the trashcan.
427            for i in range(2**20):
428                L = PyList((L,))
429                L.attr = i
430            if parity:
431                # Add one additional nesting layer
432                L = (L,)
433            self.assertGreater(PyList.num, 0)
434            del L
435            self.assertEqual(PyList.num, 0)
436
437    def test_heap_ctype_doc_and_text_signature(self):
438        self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc")
439        self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)")
440
441    def test_null_type_doc(self):
442        self.assertEqual(_testcapi.NullTpDocType.__doc__, None)
443
444    def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self):
445        class HeapGcCTypeSubclass(_testcapi.HeapGcCType):
446            def __init__(self):
447                self.value2 = 20
448                super().__init__()
449
450        subclass_instance = HeapGcCTypeSubclass()
451        type_refcnt = sys.getrefcount(HeapGcCTypeSubclass)
452
453        # Test that subclass instance was fully created
454        self.assertEqual(subclass_instance.value, 10)
455        self.assertEqual(subclass_instance.value2, 20)
456
457        # Test that the type reference count is only decremented once
458        del subclass_instance
459        self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass))
460
461    def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
462        class A(_testcapi.HeapGcCType):
463            def __init__(self):
464                self.value2 = 20
465                super().__init__()
466
467        class B(A):
468            def __init__(self):
469                super().__init__()
470
471            def __del__(self):
472                self.__class__ = A
473                A.refcnt_in_del = sys.getrefcount(A)
474                B.refcnt_in_del = sys.getrefcount(B)
475
476        subclass_instance = B()
477        type_refcnt = sys.getrefcount(B)
478        new_type_refcnt = sys.getrefcount(A)
479
480        # Test that subclass instance was fully created
481        self.assertEqual(subclass_instance.value, 10)
482        self.assertEqual(subclass_instance.value2, 20)
483
484        del subclass_instance
485
486        # Test that setting __class__ modified the reference counts of the types
487        self.assertEqual(type_refcnt - 1, B.refcnt_in_del)
488        self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del)
489
490        # Test that the original type already has decreased its refcnt
491        self.assertEqual(type_refcnt - 1, sys.getrefcount(B))
492
493        # Test that subtype_dealloc decref the newly assigned __class__ only once
494        self.assertEqual(new_type_refcnt, sys.getrefcount(A))
495
496    def test_heaptype_with_dict(self):
497        inst = _testcapi.HeapCTypeWithDict()
498        inst.foo = 42
499        self.assertEqual(inst.foo, 42)
500        self.assertEqual(inst.dictobj, inst.__dict__)
501        self.assertEqual(inst.dictobj, {"foo": 42})
502
503        inst = _testcapi.HeapCTypeWithDict()
504        self.assertEqual({}, inst.__dict__)
505
506    def test_heaptype_with_negative_dict(self):
507        inst = _testcapi.HeapCTypeWithNegativeDict()
508        inst.foo = 42
509        self.assertEqual(inst.foo, 42)
510        self.assertEqual(inst.dictobj, inst.__dict__)
511        self.assertEqual(inst.dictobj, {"foo": 42})
512
513        inst = _testcapi.HeapCTypeWithNegativeDict()
514        self.assertEqual({}, inst.__dict__)
515
516    def test_heaptype_with_weakref(self):
517        inst = _testcapi.HeapCTypeWithWeakref()
518        ref = weakref.ref(inst)
519        self.assertEqual(ref(), inst)
520        self.assertEqual(inst.weakreflist, ref)
521
522    def test_heaptype_with_buffer(self):
523        inst = _testcapi.HeapCTypeWithBuffer()
524        b = bytes(inst)
525        self.assertEqual(b, b"1234")
526
527    def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self):
528        subclass_instance = _testcapi.HeapCTypeSubclass()
529        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
530
531        # Test that subclass instance was fully created
532        self.assertEqual(subclass_instance.value, 10)
533        self.assertEqual(subclass_instance.value2, 20)
534
535        # Test that the type reference count is only decremented once
536        del subclass_instance
537        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass))
538
539    def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
540        subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer()
541        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)
542        new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
543
544        # Test that subclass instance was fully created
545        self.assertEqual(subclass_instance.value, 10)
546        self.assertEqual(subclass_instance.value2, 20)
547
548        # The tp_finalize slot will set __class__ to HeapCTypeSubclass
549        del subclass_instance
550
551        # Test that setting __class__ modified the reference counts of the types
552        self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
553        self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del)
554
555        # Test that the original type already has decreased its refcnt
556        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer))
557
558        # Test that subtype_dealloc decref the newly assigned __class__ only once
559        self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass))
560
561    def test_heaptype_with_setattro(self):
562        obj = _testcapi.HeapCTypeSetattr()
563        self.assertEqual(obj.pvalue, 10)
564        obj.value = 12
565        self.assertEqual(obj.pvalue, 12)
566        del obj.value
567        self.assertEqual(obj.pvalue, 0)
568
569    def test_pynumber_tobase(self):
570        from _testcapi import pynumber_tobase
571        self.assertEqual(pynumber_tobase(123, 2), '0b1111011')
572        self.assertEqual(pynumber_tobase(123, 8), '0o173')
573        self.assertEqual(pynumber_tobase(123, 10), '123')
574        self.assertEqual(pynumber_tobase(123, 16), '0x7b')
575        self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011')
576        self.assertEqual(pynumber_tobase(-123, 8), '-0o173')
577        self.assertEqual(pynumber_tobase(-123, 10), '-123')
578        self.assertEqual(pynumber_tobase(-123, 16), '-0x7b')
579        self.assertRaises(TypeError, pynumber_tobase, 123.0, 10)
580        self.assertRaises(TypeError, pynumber_tobase, '123', 10)
581        self.assertRaises(SystemError, pynumber_tobase, 123, 0)
582
583    def check_fatal_error(self, code, expected, not_expected=()):
584        with support.SuppressCrashReport():
585            rc, out, err = assert_python_failure('-sSI', '-c', code)
586
587        err = decode_stderr(err)
588        self.assertIn('Fatal Python error: test_fatal_error: MESSAGE\n',
589                      err)
590
591        match = re.search(r'^Extension modules:(.*) \(total: ([0-9]+)\)$',
592                          err, re.MULTILINE)
593        if not match:
594            self.fail(f"Cannot find 'Extension modules:' in {err!r}")
595        modules = set(match.group(1).strip().split(', '))
596        total = int(match.group(2))
597
598        for name in expected:
599            self.assertIn(name, modules)
600        for name in not_expected:
601            self.assertNotIn(name, modules)
602        self.assertEqual(len(modules), total)
603
604    def test_fatal_error(self):
605        # By default, stdlib extension modules are ignored,
606        # but not test modules.
607        expected = ('_testcapi',)
608        not_expected = ('sys',)
609        code = 'import _testcapi, sys; _testcapi.fatal_error(b"MESSAGE")'
610        self.check_fatal_error(code, expected, not_expected)
611
612        # Mark _testcapi as stdlib module, but not sys
613        expected = ('sys',)
614        not_expected = ('_testcapi',)
615        code = textwrap.dedent('''
616            import _testcapi, sys
617            sys.stdlib_module_names = frozenset({"_testcapi"})
618            _testcapi.fatal_error(b"MESSAGE")
619        ''')
620        self.check_fatal_error(code, expected)
621
622    def test_pyobject_repr_from_null(self):
623        s = _testcapi.pyobject_repr_from_null()
624        self.assertEqual(s, '<NULL>')
625
626    def test_pyobject_str_from_null(self):
627        s = _testcapi.pyobject_str_from_null()
628        self.assertEqual(s, '<NULL>')
629
630    def test_pyobject_bytes_from_null(self):
631        s = _testcapi.pyobject_bytes_from_null()
632        self.assertEqual(s, b'<NULL>')
633
634    def test_Py_CompileString(self):
635        # Check that Py_CompileString respects the coding cookie
636        _compile = _testcapi.Py_CompileString
637        code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n"
638        result = _compile(code)
639        expected = compile(code, "<string>", "exec")
640        self.assertEqual(result.co_consts, expected.co_consts)
641
642
643class TestPendingCalls(unittest.TestCase):
644
645    def pendingcalls_submit(self, l, n):
646        def callback():
647            #this function can be interrupted by thread switching so let's
648            #use an atomic operation
649            l.append(None)
650
651        for i in range(n):
652            time.sleep(random.random()*0.02) #0.01 secs on average
653            #try submitting callback until successful.
654            #rely on regular interrupt to flush queue if we are
655            #unsuccessful.
656            while True:
657                if _testcapi._pending_threadfunc(callback):
658                    break
659
660    def pendingcalls_wait(self, l, n, context = None):
661        #now, stick around until l[0] has grown to 10
662        count = 0
663        while len(l) != n:
664            #this busy loop is where we expect to be interrupted to
665            #run our callbacks.  Note that callbacks are only run on the
666            #main thread
667            if False and support.verbose:
668                print("(%i)"%(len(l),),)
669            for i in range(1000):
670                a = i*i
671            if context and not context.event.is_set():
672                continue
673            count += 1
674            self.assertTrue(count < 10000,
675                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
676        if False and support.verbose:
677            print("(%i)"%(len(l),))
678
679    def test_pendingcalls_threaded(self):
680
681        #do every callback on a separate thread
682        n = 32 #total callbacks
683        threads = []
684        class foo(object):pass
685        context = foo()
686        context.l = []
687        context.n = 2 #submits per thread
688        context.nThreads = n // context.n
689        context.nFinished = 0
690        context.lock = threading.Lock()
691        context.event = threading.Event()
692
693        threads = [threading.Thread(target=self.pendingcalls_thread,
694                                    args=(context,))
695                   for i in range(context.nThreads)]
696        with threading_helper.start_threads(threads):
697            self.pendingcalls_wait(context.l, n, context)
698
699    def pendingcalls_thread(self, context):
700        try:
701            self.pendingcalls_submit(context.l, context.n)
702        finally:
703            with context.lock:
704                context.nFinished += 1
705                nFinished = context.nFinished
706                if False and support.verbose:
707                    print("finished threads: ", nFinished)
708            if nFinished == context.nThreads:
709                context.event.set()
710
711    def test_pendingcalls_non_threaded(self):
712        #again, just using the main thread, likely they will all be dispatched at
713        #once.  It is ok to ask for too many, because we loop until we find a slot.
714        #the loop can be interrupted to dispatch.
715        #there are only 32 dispatch slots, so we go for twice that!
716        l = []
717        n = 64
718        self.pendingcalls_submit(l, n)
719        self.pendingcalls_wait(l, n)
720
721
722class SubinterpreterTest(unittest.TestCase):
723
724    def test_subinterps(self):
725        import builtins
726        r, w = os.pipe()
727        code = """if 1:
728            import sys, builtins, pickle
729            with open({:d}, "wb") as f:
730                pickle.dump(id(sys.modules), f)
731                pickle.dump(id(builtins), f)
732            """.format(w)
733        with open(r, "rb") as f:
734            ret = support.run_in_subinterp(code)
735            self.assertEqual(ret, 0)
736            self.assertNotEqual(pickle.load(f), id(sys.modules))
737            self.assertNotEqual(pickle.load(f), id(builtins))
738
739    def test_subinterps_recent_language_features(self):
740        r, w = os.pipe()
741        code = """if 1:
742            import pickle
743            with open({:d}, "wb") as f:
744
745                @(lambda x:x)  # Py 3.9
746                def noop(x): return x
747
748                a = (b := f'1{{2}}3') + noop('x')  # Py 3.8 (:=) / 3.6 (f'')
749
750                async def foo(arg): return await arg  # Py 3.5
751
752                pickle.dump(dict(a=a, b=b), f)
753            """.format(w)
754
755        with open(r, "rb") as f:
756            ret = support.run_in_subinterp(code)
757            self.assertEqual(ret, 0)
758            self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'})
759
760    def test_mutate_exception(self):
761        """
762        Exceptions saved in global module state get shared between
763        individual module instances. This test checks whether or not
764        a change in one interpreter's module gets reflected into the
765        other ones.
766        """
767        import binascii
768
769        support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'")
770
771        self.assertFalse(hasattr(binascii.Error, "foobar"))
772
773    def test_module_state_shared_in_global(self):
774        """
775        bpo-44050: Extension module state should be shared between interpreters
776        when it doesn't support sub-interpreters.
777        """
778        r, w = os.pipe()
779        self.addCleanup(os.close, r)
780        self.addCleanup(os.close, w)
781
782        script = textwrap.dedent(f"""
783            import importlib.machinery
784            import importlib.util
785            import os
786
787            fullname = '_test_module_state_shared'
788            origin = importlib.util.find_spec('_testmultiphase').origin
789            loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
790            spec = importlib.util.spec_from_loader(fullname, loader)
791            module = importlib.util.module_from_spec(spec)
792            attr_id = str(id(module.Error)).encode()
793
794            os.write({w}, attr_id)
795            """)
796        exec(script)
797        main_attr_id = os.read(r, 100)
798
799        ret = support.run_in_subinterp(script)
800        self.assertEqual(ret, 0)
801        subinterp_attr_id = os.read(r, 100)
802        self.assertEqual(main_attr_id, subinterp_attr_id)
803
804
805class TestThreadState(unittest.TestCase):
806
807    @threading_helper.reap_threads
808    def test_thread_state(self):
809        # some extra thread-state tests driven via _testcapi
810        def target():
811            idents = []
812
813            def callback():
814                idents.append(threading.get_ident())
815
816            _testcapi._test_thread_state(callback)
817            a = b = callback
818            time.sleep(1)
819            # Check our main thread is in the list exactly 3 times.
820            self.assertEqual(idents.count(threading.get_ident()), 3,
821                             "Couldn't find main thread correctly in the list")
822
823        target()
824        t = threading.Thread(target=target)
825        t.start()
826        t.join()
827
828
829class Test_testcapi(unittest.TestCase):
830    locals().update((name, getattr(_testcapi, name))
831                    for name in dir(_testcapi)
832                    if name.startswith('test_') and not name.endswith('_code'))
833
834    # Suppress warning from PyUnicode_FromUnicode().
835    @warnings_helper.ignore_warnings(category=DeprecationWarning)
836    def test_widechar(self):
837        _testcapi.test_widechar()
838
839
840class Test_testinternalcapi(unittest.TestCase):
841    locals().update((name, getattr(_testinternalcapi, name))
842                    for name in dir(_testinternalcapi)
843                    if name.startswith('test_'))
844
845
846class PyMemDebugTests(unittest.TestCase):
847    PYTHONMALLOC = 'debug'
848    # '0x04c06e0' or '04C06E0'
849    PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+'
850
851    def check(self, code):
852        with support.SuppressCrashReport():
853            out = assert_python_failure(
854                '-c', code,
855                PYTHONMALLOC=self.PYTHONMALLOC,
856                # FreeBSD: instruct jemalloc to not fill freed() memory
857                # with junk byte 0x5a, see JEMALLOC(3)
858                MALLOC_CONF="junk:false",
859            )
860        stderr = out.err
861        return stderr.decode('ascii', 'replace')
862
863    def test_buffer_overflow(self):
864        out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()')
865        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
866                 r"    16 bytes originally requested\n"
867                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
868                 r"    The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n"
869                 r"        at tail\+0: 0x78 \*\*\* OUCH\n"
870                 r"        at tail\+1: 0xfd\n"
871                 r"        at tail\+2: 0xfd\n"
872                 r"        .*\n"
873                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
874                 r"    Data at p: cd cd cd .*\n"
875                 r"\n"
876                 r"Enable tracemalloc to get the memory block allocation traceback\n"
877                 r"\n"
878                 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte")
879        regex = regex.format(ptr=self.PTR_REGEX)
880        regex = re.compile(regex, flags=re.DOTALL)
881        self.assertRegex(out, regex)
882
883    def test_api_misuse(self):
884        out = self.check('import _testcapi; _testcapi.pymem_api_misuse()')
885        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
886                 r"    16 bytes originally requested\n"
887                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
888                 r"    The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n"
889                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
890                 r"    Data at p: cd cd cd .*\n"
891                 r"\n"
892                 r"Enable tracemalloc to get the memory block allocation traceback\n"
893                 r"\n"
894                 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n")
895        regex = regex.format(ptr=self.PTR_REGEX)
896        self.assertRegex(out, regex)
897
898    def check_malloc_without_gil(self, code):
899        out = self.check(code)
900        expected = ('Fatal Python error: _PyMem_DebugMalloc: '
901                    'Python memory allocator called without holding the GIL')
902        self.assertIn(expected, out)
903
904    def test_pymem_malloc_without_gil(self):
905        # Debug hooks must raise an error if PyMem_Malloc() is called
906        # without holding the GIL
907        code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()'
908        self.check_malloc_without_gil(code)
909
910    def test_pyobject_malloc_without_gil(self):
911        # Debug hooks must raise an error if PyObject_Malloc() is called
912        # without holding the GIL
913        code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()'
914        self.check_malloc_without_gil(code)
915
916    def check_pyobject_is_freed(self, func_name):
917        code = textwrap.dedent(f'''
918            import gc, os, sys, _testcapi
919            # Disable the GC to avoid crash on GC collection
920            gc.disable()
921            try:
922                _testcapi.{func_name}()
923                # Exit immediately to avoid a crash while deallocating
924                # the invalid object
925                os._exit(0)
926            except _testcapi.error:
927                os._exit(1)
928        ''')
929        assert_python_ok(
930            '-c', code,
931            PYTHONMALLOC=self.PYTHONMALLOC,
932            MALLOC_CONF="junk:false",
933        )
934
935    def test_pyobject_null_is_freed(self):
936        self.check_pyobject_is_freed('check_pyobject_null_is_freed')
937
938    def test_pyobject_uninitialized_is_freed(self):
939        self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed')
940
941    def test_pyobject_forbidden_bytes_is_freed(self):
942        self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed')
943
944    def test_pyobject_freed_is_freed(self):
945        self.check_pyobject_is_freed('check_pyobject_freed_is_freed')
946
947
948class PyMemMallocDebugTests(PyMemDebugTests):
949    PYTHONMALLOC = 'malloc_debug'
950
951
952@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc')
953class PyMemPymallocDebugTests(PyMemDebugTests):
954    PYTHONMALLOC = 'pymalloc_debug'
955
956
957@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG')
958class PyMemDefaultTests(PyMemDebugTests):
959    # test default allocator of Python compiled in debug mode
960    PYTHONMALLOC = ''
961
962
963class Test_ModuleStateAccess(unittest.TestCase):
964    """Test access to module start (PEP 573)"""
965
966    # The C part of the tests lives in _testmultiphase, in a module called
967    # _testmultiphase_meth_state_access.
968    # This module has multi-phase initialization, unlike _testcapi.
969
970    def setUp(self):
971        fullname = '_testmultiphase_meth_state_access'  # XXX
972        origin = importlib.util.find_spec('_testmultiphase').origin
973        loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
974        spec = importlib.util.spec_from_loader(fullname, loader)
975        module = importlib.util.module_from_spec(spec)
976        loader.exec_module(module)
977        self.module = module
978
979    def test_subclass_get_module(self):
980        """PyType_GetModule for defining_class"""
981        class StateAccessType_Subclass(self.module.StateAccessType):
982            pass
983
984        instance = StateAccessType_Subclass()
985        self.assertIs(instance.get_defining_module(), self.module)
986
987    def test_subclass_get_module_with_super(self):
988        class StateAccessType_Subclass(self.module.StateAccessType):
989            def get_defining_module(self):
990                return super().get_defining_module()
991
992        instance = StateAccessType_Subclass()
993        self.assertIs(instance.get_defining_module(), self.module)
994
995    def test_state_access(self):
996        """Checks methods defined with and without argument clinic
997
998        This tests a no-arg method (get_count) and a method with
999        both a positional and keyword argument.
1000        """
1001
1002        a = self.module.StateAccessType()
1003        b = self.module.StateAccessType()
1004
1005        methods = {
1006            'clinic': a.increment_count_clinic,
1007            'noclinic': a.increment_count_noclinic,
1008        }
1009
1010        for name, increment_count in methods.items():
1011            with self.subTest(name):
1012                self.assertEqual(a.get_count(), b.get_count())
1013                self.assertEqual(a.get_count(), 0)
1014
1015                increment_count()
1016                self.assertEqual(a.get_count(), b.get_count())
1017                self.assertEqual(a.get_count(), 1)
1018
1019                increment_count(3)
1020                self.assertEqual(a.get_count(), b.get_count())
1021                self.assertEqual(a.get_count(), 4)
1022
1023                increment_count(-2, twice=True)
1024                self.assertEqual(a.get_count(), b.get_count())
1025                self.assertEqual(a.get_count(), 0)
1026
1027                with self.assertRaises(TypeError):
1028                    increment_count(thrice=3)
1029
1030                with self.assertRaises(TypeError):
1031                    increment_count(1, 2, 3)
1032
1033
1034if __name__ == "__main__":
1035    unittest.main()
1036