• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4from collections import OrderedDict
5import os
6import pickle
7import random
8import re
9import subprocess
10import sys
11import sysconfig
12import textwrap
13import threading
14import time
15import unittest
16from test import support
17from test.support import MISSING_C_DOCSTRINGS
18from test.support.script_helper import assert_python_failure, assert_python_ok
19try:
20    import _posixsubprocess
21except ImportError:
22    _posixsubprocess = None
23
24# Skip this test if the _testcapi module isn't available.
25_testcapi = support.import_module('_testcapi')
26
27# Were we compiled --with-pydebug or with #define Py_DEBUG?
28Py_DEBUG = hasattr(sys, 'gettotalrefcount')
29
30
31def testfunction(self):
32    """some doc"""
33    return self
34
35class InstanceMethod:
36    id = _testcapi.instancemethod(id)
37    testfunction = _testcapi.instancemethod(testfunction)
38
39class CAPITest(unittest.TestCase):
40
41    def test_instancemethod(self):
42        inst = InstanceMethod()
43        self.assertEqual(id(inst), inst.id())
44        self.assertTrue(inst.testfunction() is inst)
45        self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
46        self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
47
48        InstanceMethod.testfunction.attribute = "test"
49        self.assertEqual(testfunction.attribute, "test")
50        self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
51
52    def test_no_FatalError_infinite_loop(self):
53        with support.SuppressCrashReport():
54            p = subprocess.Popen([sys.executable, "-c",
55                                  'import _testcapi;'
56                                  '_testcapi.crash_no_current_thread()'],
57                                 stdout=subprocess.PIPE,
58                                 stderr=subprocess.PIPE)
59        (out, err) = p.communicate()
60        self.assertEqual(out, b'')
61        # This used to cause an infinite loop.
62        self.assertTrue(err.rstrip().startswith(
63                         b'Fatal Python error:'
64                         b' PyThreadState_Get: no current thread'))
65
66    def test_memoryview_from_NULL_pointer(self):
67        self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
68
69    def test_exc_info(self):
70        raised_exception = ValueError("5")
71        new_exc = TypeError("TEST")
72        try:
73            raise raised_exception
74        except ValueError as e:
75            tb = e.__traceback__
76            orig_sys_exc_info = sys.exc_info()
77            orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
78            new_sys_exc_info = sys.exc_info()
79            new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
80            reset_sys_exc_info = sys.exc_info()
81
82            self.assertEqual(orig_exc_info[1], e)
83
84            self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
85            self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
86            self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
87            self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
88            self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
89        else:
90            self.assertTrue(False)
91
92    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
93    def test_seq_bytes_to_charp_array(self):
94        # Issue #15732: crash in _PySequence_BytesToCharpArray()
95        class Z(object):
96            def __len__(self):
97                return 1
98        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
99                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17)
100        # Issue #15736: overflow in _PySequence_BytesToCharpArray()
101        class Z(object):
102            def __len__(self):
103                return sys.maxsize
104            def __getitem__(self, i):
105                return b'x'
106        self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
107                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17)
108
109    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
110    def test_subprocess_fork_exec(self):
111        class Z(object):
112            def __len__(self):
113                return 1
114
115        # Issue #15738: crash in subprocess_fork_exec()
116        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
117                          Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17)
118
119    @unittest.skipIf(MISSING_C_DOCSTRINGS,
120                     "Signature information for builtins requires docstrings")
121    def test_docstring_signature_parsing(self):
122
123        self.assertEqual(_testcapi.no_docstring.__doc__, None)
124        self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
125
126        self.assertEqual(_testcapi.docstring_empty.__doc__, None)
127        self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
128
129        self.assertEqual(_testcapi.docstring_no_signature.__doc__,
130            "This docstring has no signature.")
131        self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
132
133        self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
134            "docstring_with_invalid_signature($module, /, boo)\n"
135            "\n"
136            "This docstring has an invalid signature."
137            )
138        self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
139
140        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
141            "docstring_with_invalid_signature2($module, /, boo)\n"
142            "\n"
143            "--\n"
144            "\n"
145            "This docstring also has an invalid signature."
146            )
147        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
148
149        self.assertEqual(_testcapi.docstring_with_signature.__doc__,
150            "This docstring has a valid signature.")
151        self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
152
153        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
154        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
155            "($module, /, sig)")
156
157        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
158            "\nThis docstring has a valid signature and some extra newlines.")
159        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
160            "($module, /, parameter)")
161
162    def test_c_type_with_matrix_multiplication(self):
163        M = _testcapi.matmulType
164        m1 = M()
165        m2 = M()
166        self.assertEqual(m1 @ m2, ("matmul", m1, m2))
167        self.assertEqual(m1 @ 42, ("matmul", m1, 42))
168        self.assertEqual(42 @ m1, ("matmul", 42, m1))
169        o = m1
170        o @= m2
171        self.assertEqual(o, ("imatmul", m1, m2))
172        o = m1
173        o @= 42
174        self.assertEqual(o, ("imatmul", m1, 42))
175        o = 42
176        o @= m1
177        self.assertEqual(o, ("matmul", 42, m1))
178
179    def test_return_null_without_error(self):
180        # Issue #23571: A function must not return NULL without setting an
181        # error
182        if Py_DEBUG:
183            code = textwrap.dedent("""
184                import _testcapi
185                from test import support
186
187                with support.SuppressCrashReport():
188                    _testcapi.return_null_without_error()
189            """)
190            rc, out, err = assert_python_failure('-c', code)
191            self.assertRegex(err.replace(b'\r', b''),
192                             br'Fatal Python error: a function returned NULL '
193                                br'without setting an error\n'
194                             br'SystemError: <built-in function '
195                                 br'return_null_without_error> returned NULL '
196                                 br'without setting an error\n'
197                             br'\n'
198                             br'Current thread.*:\n'
199                             br'  File .*", line 6 in <module>')
200        else:
201            with self.assertRaises(SystemError) as cm:
202                _testcapi.return_null_without_error()
203            self.assertRegex(str(cm.exception),
204                             'return_null_without_error.* '
205                             'returned NULL without setting an error')
206
207    def test_return_result_with_error(self):
208        # Issue #23571: A function must not return a result with an error set
209        if Py_DEBUG:
210            code = textwrap.dedent("""
211                import _testcapi
212                from test import support
213
214                with support.SuppressCrashReport():
215                    _testcapi.return_result_with_error()
216            """)
217            rc, out, err = assert_python_failure('-c', code)
218            self.assertRegex(err.replace(b'\r', b''),
219                             br'Fatal Python error: a function returned a '
220                                br'result with an error set\n'
221                             br'ValueError\n'
222                             br'\n'
223                             br'The above exception was the direct cause '
224                                br'of the following exception:\n'
225                             br'\n'
226                             br'SystemError: <built-in '
227                                br'function return_result_with_error> '
228                                br'returned a result with an error set\n'
229                             br'\n'
230                             br'Current thread.*:\n'
231                             br'  File .*, line 6 in <module>')
232        else:
233            with self.assertRaises(SystemError) as cm:
234                _testcapi.return_result_with_error()
235            self.assertRegex(str(cm.exception),
236                             'return_result_with_error.* '
237                             'returned a result with an error set')
238
239    def test_buildvalue_N(self):
240        _testcapi.test_buildvalue_N()
241
242    def test_set_nomemory(self):
243        code = """if 1:
244            import _testcapi
245
246            class C(): pass
247
248            # The first loop tests both functions and that remove_mem_hooks()
249            # can be called twice in a row. The second loop checks a call to
250            # set_nomemory() after a call to remove_mem_hooks(). The third
251            # loop checks the start and stop arguments of set_nomemory().
252            for outer_cnt in range(1, 4):
253                start = 10 * outer_cnt
254                for j in range(100):
255                    if j == 0:
256                        if outer_cnt != 3:
257                            _testcapi.set_nomemory(start)
258                        else:
259                            _testcapi.set_nomemory(start, start + 1)
260                    try:
261                        C()
262                    except MemoryError as e:
263                        if outer_cnt != 3:
264                            _testcapi.remove_mem_hooks()
265                        print('MemoryError', outer_cnt, j)
266                        _testcapi.remove_mem_hooks()
267                        break
268        """
269        rc, out, err = assert_python_ok('-c', code)
270        self.assertIn(b'MemoryError 1 10', out)
271        self.assertIn(b'MemoryError 2 20', out)
272        self.assertIn(b'MemoryError 3 30', out)
273
274    def test_mapping_keys_values_items(self):
275        class Mapping1(dict):
276            def keys(self):
277                return list(super().keys())
278            def values(self):
279                return list(super().values())
280            def items(self):
281                return list(super().items())
282        class Mapping2(dict):
283            def keys(self):
284                return tuple(super().keys())
285            def values(self):
286                return tuple(super().values())
287            def items(self):
288                return tuple(super().items())
289        dict_obj = {'foo': 1, 'bar': 2, 'spam': 3}
290
291        for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(),
292                        dict_obj, OrderedDict(dict_obj),
293                        Mapping1(dict_obj), Mapping2(dict_obj)]:
294            self.assertListEqual(_testcapi.get_mapping_keys(mapping),
295                                 list(mapping.keys()))
296            self.assertListEqual(_testcapi.get_mapping_values(mapping),
297                                 list(mapping.values()))
298            self.assertListEqual(_testcapi.get_mapping_items(mapping),
299                                 list(mapping.items()))
300
301    def test_mapping_keys_values_items_bad_arg(self):
302        self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None)
303        self.assertRaises(AttributeError, _testcapi.get_mapping_values, None)
304        self.assertRaises(AttributeError, _testcapi.get_mapping_items, None)
305
306        class BadMapping:
307            def keys(self):
308                return None
309            def values(self):
310                return None
311            def items(self):
312                return None
313        bad_mapping = BadMapping()
314        self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping)
315        self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping)
316        self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping)
317
318
319class TestPendingCalls(unittest.TestCase):
320
321    def pendingcalls_submit(self, l, n):
322        def callback():
323            #this function can be interrupted by thread switching so let's
324            #use an atomic operation
325            l.append(None)
326
327        for i in range(n):
328            time.sleep(random.random()*0.02) #0.01 secs on average
329            #try submitting callback until successful.
330            #rely on regular interrupt to flush queue if we are
331            #unsuccessful.
332            while True:
333                if _testcapi._pending_threadfunc(callback):
334                    break;
335
336    def pendingcalls_wait(self, l, n, context = None):
337        #now, stick around until l[0] has grown to 10
338        count = 0;
339        while len(l) != n:
340            #this busy loop is where we expect to be interrupted to
341            #run our callbacks.  Note that callbacks are only run on the
342            #main thread
343            if False and support.verbose:
344                print("(%i)"%(len(l),),)
345            for i in range(1000):
346                a = i*i
347            if context and not context.event.is_set():
348                continue
349            count += 1
350            self.assertTrue(count < 10000,
351                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
352        if False and support.verbose:
353            print("(%i)"%(len(l),))
354
355    def test_pendingcalls_threaded(self):
356
357        #do every callback on a separate thread
358        n = 32 #total callbacks
359        threads = []
360        class foo(object):pass
361        context = foo()
362        context.l = []
363        context.n = 2 #submits per thread
364        context.nThreads = n // context.n
365        context.nFinished = 0
366        context.lock = threading.Lock()
367        context.event = threading.Event()
368
369        threads = [threading.Thread(target=self.pendingcalls_thread,
370                                    args=(context,))
371                   for i in range(context.nThreads)]
372        with support.start_threads(threads):
373            self.pendingcalls_wait(context.l, n, context)
374
375    def pendingcalls_thread(self, context):
376        try:
377            self.pendingcalls_submit(context.l, context.n)
378        finally:
379            with context.lock:
380                context.nFinished += 1
381                nFinished = context.nFinished
382                if False and support.verbose:
383                    print("finished threads: ", nFinished)
384            if nFinished == context.nThreads:
385                context.event.set()
386
387    def test_pendingcalls_non_threaded(self):
388        #again, just using the main thread, likely they will all be dispatched at
389        #once.  It is ok to ask for too many, because we loop until we find a slot.
390        #the loop can be interrupted to dispatch.
391        #there are only 32 dispatch slots, so we go for twice that!
392        l = []
393        n = 64
394        self.pendingcalls_submit(l, n)
395        self.pendingcalls_wait(l, n)
396
397
398class SubinterpreterTest(unittest.TestCase):
399
400    def test_subinterps(self):
401        import builtins
402        r, w = os.pipe()
403        code = """if 1:
404            import sys, builtins, pickle
405            with open({:d}, "wb") as f:
406                pickle.dump(id(sys.modules), f)
407                pickle.dump(id(builtins), f)
408            """.format(w)
409        with open(r, "rb") as f:
410            ret = support.run_in_subinterp(code)
411            self.assertEqual(ret, 0)
412            self.assertNotEqual(pickle.load(f), id(sys.modules))
413            self.assertNotEqual(pickle.load(f), id(builtins))
414
415
416class TestThreadState(unittest.TestCase):
417
418    @support.reap_threads
419    def test_thread_state(self):
420        # some extra thread-state tests driven via _testcapi
421        def target():
422            idents = []
423
424            def callback():
425                idents.append(threading.get_ident())
426
427            _testcapi._test_thread_state(callback)
428            a = b = callback
429            time.sleep(1)
430            # Check our main thread is in the list exactly 3 times.
431            self.assertEqual(idents.count(threading.get_ident()), 3,
432                             "Couldn't find main thread correctly in the list")
433
434        target()
435        t = threading.Thread(target=target)
436        t.start()
437        t.join()
438
439
440class Test_testcapi(unittest.TestCase):
441    locals().update((name, getattr(_testcapi, name))
442                    for name in dir(_testcapi)
443                    if name.startswith('test_') and not name.endswith('_code'))
444
445
446class PyMemDebugTests(unittest.TestCase):
447    PYTHONMALLOC = 'debug'
448    # '0x04c06e0' or '04C06E0'
449    PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+'
450
451    def check(self, code):
452        with support.SuppressCrashReport():
453            out = assert_python_failure('-c', code,
454                                        PYTHONMALLOC=self.PYTHONMALLOC)
455        stderr = out.err
456        return stderr.decode('ascii', 'replace')
457
458    def test_buffer_overflow(self):
459        out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()')
460        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
461                 r"    16 bytes originally requested\n"
462                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
463                 r"    The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n"
464                 r"        at tail\+0: 0x78 \*\*\* OUCH\n"
465                 r"        at tail\+1: 0xfb\n"
466                 r"        at tail\+2: 0xfb\n"
467                 r"        .*\n"
468                 r"    The block was made by call #[0-9]+ to debug malloc/realloc.\n"
469                 r"    Data at p: cb cb cb .*\n"
470                 r"\n"
471                 r"Enable tracemalloc to get the memory block allocation traceback\n"
472                 r"\n"
473                 r"Fatal Python error: bad trailing pad byte")
474        regex = regex.format(ptr=self.PTR_REGEX)
475        regex = re.compile(regex, flags=re.DOTALL)
476        self.assertRegex(out, regex)
477
478    def test_api_misuse(self):
479        out = self.check('import _testcapi; _testcapi.pymem_api_misuse()')
480        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
481                 r"    16 bytes originally requested\n"
482                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
483                 r"    The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n"
484                 r"    The block was made by call #[0-9]+ to debug malloc/realloc.\n"
485                 r"    Data at p: cb cb cb .*\n"
486                 r"\n"
487                 r"Enable tracemalloc to get the memory block allocation traceback\n"
488                 r"\n"
489                 r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n")
490        regex = regex.format(ptr=self.PTR_REGEX)
491        self.assertRegex(out, regex)
492
493    def check_malloc_without_gil(self, code):
494        out = self.check(code)
495        expected = ('Fatal Python error: Python memory allocator called '
496                    'without holding the GIL')
497        self.assertIn(expected, out)
498
499    def test_pymem_malloc_without_gil(self):
500        # Debug hooks must raise an error if PyMem_Malloc() is called
501        # without holding the GIL
502        code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()'
503        self.check_malloc_without_gil(code)
504
505    def test_pyobject_malloc_without_gil(self):
506        # Debug hooks must raise an error if PyObject_Malloc() is called
507        # without holding the GIL
508        code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()'
509        self.check_malloc_without_gil(code)
510
511
512class PyMemMallocDebugTests(PyMemDebugTests):
513    PYTHONMALLOC = 'malloc_debug'
514
515
516@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc')
517class PyMemPymallocDebugTests(PyMemDebugTests):
518    PYTHONMALLOC = 'pymalloc_debug'
519
520
521@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG')
522class PyMemDefaultTests(PyMemDebugTests):
523    # test default allocator of Python compiled in debug mode
524    PYTHONMALLOC = ''
525
526
527if __name__ == "__main__":
528    unittest.main()
529