1 # Run the _testcapi module tests (tests for the Python/C API): by defn,
2 # these are all functions _testcapi exports whose name begins with 'test_'.
3
4 from collections import OrderedDict
5 import _thread
6 import contextlib
7 import importlib.machinery
8 import importlib.util
9 import os
10 import pickle
11 import random
12 import re
13 import subprocess
14 import sys
15 import textwrap
16 import threading
17 import time
18 import unittest
19 import weakref
20 from test import support
21 from test.support import MISSING_C_DOCSTRINGS
22 from test.support import import_helper
23 from test.support import threading_helper
24 from test.support import warnings_helper
25 from test.support.script_helper import assert_python_failure, assert_python_ok
26 try:
27 import _posixsubprocess
28 except ImportError:
29 _posixsubprocess = None
30 try:
31 import _testmultiphase
32 except ImportError:
33 _testmultiphase = None
34
35 # Skip this test if the _testcapi module isn't available.
36 _testcapi = import_helper.import_module('_testcapi')
37
38 import _testinternalcapi
39
40 # Were we compiled --with-pydebug or with #define Py_DEBUG?
41 Py_DEBUG = hasattr(sys, 'gettotalrefcount')
42
43
44 NULL = None
45
46 def decode_stderr(err):
47 return err.decode('utf-8', 'replace').replace('\r', '')
48
49
50 def testfunction(self):
51 """some doc"""
52 return self
53
54
55 class ESC[4;38;5;81mInstanceMethod:
56 id = _testcapi.instancemethod(id)
57 testfunction = _testcapi.instancemethod(testfunction)
58
59 class ESC[4;38;5;81mCAPITest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
60
61 def test_instancemethod(self):
62 inst = InstanceMethod()
63 self.assertEqual(id(inst), inst.id())
64 self.assertTrue(inst.testfunction() is inst)
65 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
66 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
67
68 InstanceMethod.testfunction.attribute = "test"
69 self.assertEqual(testfunction.attribute, "test")
70 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
71
72 @support.requires_subprocess()
73 def test_no_FatalError_infinite_loop(self):
74 with support.SuppressCrashReport():
75 p = subprocess.Popen([sys.executable, "-c",
76 'import _testcapi;'
77 '_testcapi.crash_no_current_thread()'],
78 stdout=subprocess.PIPE,
79 stderr=subprocess.PIPE)
80 (out, err) = p.communicate()
81 self.assertEqual(out, b'')
82 # This used to cause an infinite loop.
83 self.assertTrue(err.rstrip().startswith(
84 b'Fatal Python error: '
85 b'PyThreadState_Get: '
86 b'the function must be called with the GIL held, '
87 b'but the GIL is released '
88 b'(the current Python thread state is NULL)'),
89 err)
90
91 def test_memoryview_from_NULL_pointer(self):
92 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
93
94 def test_exception(self):
95 raised_exception = ValueError("5")
96 new_exc = TypeError("TEST")
97 try:
98 raise raised_exception
99 except ValueError as e:
100 orig_sys_exception = sys.exception()
101 orig_exception = _testcapi.set_exception(new_exc)
102 new_sys_exception = sys.exception()
103 new_exception = _testcapi.set_exception(orig_exception)
104 reset_sys_exception = sys.exception()
105
106 self.assertEqual(orig_exception, e)
107
108 self.assertEqual(orig_exception, raised_exception)
109 self.assertEqual(orig_sys_exception, orig_exception)
110 self.assertEqual(reset_sys_exception, orig_exception)
111 self.assertEqual(new_exception, new_exc)
112 self.assertEqual(new_sys_exception, new_exception)
113 else:
114 self.fail("Exception not raised")
115
116 def test_exc_info(self):
117 raised_exception = ValueError("5")
118 new_exc = TypeError("TEST")
119 try:
120 raise raised_exception
121 except ValueError as e:
122 tb = e.__traceback__
123 orig_sys_exc_info = sys.exc_info()
124 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
125 new_sys_exc_info = sys.exc_info()
126 new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
127 reset_sys_exc_info = sys.exc_info()
128
129 self.assertEqual(orig_exc_info[1], e)
130
131 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
132 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
133 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
134 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
135 self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
136 else:
137 self.assertTrue(False)
138
139 def test_set_object(self):
140 # new exception as obj is not an exception
141 with self.assertRaises(ValueError) as e:
142 _testcapi.exc_set_object(ValueError, 42)
143 self.assertEqual(e.exception.args, (42,))
144
145 # wraps the exception because unrelated types
146 with self.assertRaises(ValueError) as e:
147 _testcapi.exc_set_object(ValueError, TypeError(1,2,3))
148 wrapped = e.exception.args[0]
149 self.assertIsInstance(wrapped, TypeError)
150 self.assertEqual(wrapped.args, (1, 2, 3))
151
152 # is superclass, so does not wrap
153 with self.assertRaises(PermissionError) as e:
154 _testcapi.exc_set_object(OSError, PermissionError(24))
155 self.assertEqual(e.exception.args, (24,))
156
157 class ESC[4;38;5;81mMeta(ESC[4;38;5;149mtype):
158 def __subclasscheck__(cls, sub):
159 1/0
160
161 class ESC[4;38;5;81mBroken(ESC[4;38;5;149mException, metaclass=ESC[4;38;5;149mMeta):
162 pass
163
164 with self.assertRaises(ZeroDivisionError) as e:
165 _testcapi.exc_set_object(Broken, Broken())
166
167 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
168 def test_seq_bytes_to_charp_array(self):
169 # Issue #15732: crash in _PySequence_BytesToCharpArray()
170 class ESC[4;38;5;81mZ(ESC[4;38;5;149mobject):
171 def __len__(self):
172 return 1
173 with self.assertRaisesRegex(TypeError, 'indexing'):
174 _posixsubprocess.fork_exec(
175 1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
176 # Issue #15736: overflow in _PySequence_BytesToCharpArray()
177 class ESC[4;38;5;81mZ(ESC[4;38;5;149mobject):
178 def __len__(self):
179 return sys.maxsize
180 def __getitem__(self, i):
181 return b'x'
182 self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
183 1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
184
185 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
186 def test_subprocess_fork_exec(self):
187 class ESC[4;38;5;81mZ(ESC[4;38;5;149mobject):
188 def __len__(self):
189 return 1
190
191 # Issue #15738: crash in subprocess_fork_exec()
192 self.assertRaises(TypeError, _posixsubprocess.fork_exec,
193 Z(),[b'1'],True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
194
195 @unittest.skipIf(MISSING_C_DOCSTRINGS,
196 "Signature information for builtins requires docstrings")
197 def test_docstring_signature_parsing(self):
198
199 self.assertEqual(_testcapi.no_docstring.__doc__, None)
200 self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
201
202 self.assertEqual(_testcapi.docstring_empty.__doc__, None)
203 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
204
205 self.assertEqual(_testcapi.docstring_no_signature.__doc__,
206 "This docstring has no signature.")
207 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
208
209 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
210 "docstring_with_invalid_signature($module, /, boo)\n"
211 "\n"
212 "This docstring has an invalid signature."
213 )
214 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
215
216 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
217 "docstring_with_invalid_signature2($module, /, boo)\n"
218 "\n"
219 "--\n"
220 "\n"
221 "This docstring also has an invalid signature."
222 )
223 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
224
225 self.assertEqual(_testcapi.docstring_with_signature.__doc__,
226 "This docstring has a valid signature.")
227 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
228
229 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
230 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
231 "($module, /, sig)")
232
233 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
234 "\nThis docstring has a valid signature and some extra newlines.")
235 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
236 "($module, /, parameter)")
237
238 def test_c_type_with_matrix_multiplication(self):
239 M = _testcapi.matmulType
240 m1 = M()
241 m2 = M()
242 self.assertEqual(m1 @ m2, ("matmul", m1, m2))
243 self.assertEqual(m1 @ 42, ("matmul", m1, 42))
244 self.assertEqual(42 @ m1, ("matmul", 42, m1))
245 o = m1
246 o @= m2
247 self.assertEqual(o, ("imatmul", m1, m2))
248 o = m1
249 o @= 42
250 self.assertEqual(o, ("imatmul", m1, 42))
251 o = 42
252 o @= m1
253 self.assertEqual(o, ("matmul", 42, m1))
254
255 def test_c_type_with_ipow(self):
256 # When the __ipow__ method of a type was implemented in C, using the
257 # modulo param would cause segfaults.
258 o = _testcapi.ipowType()
259 self.assertEqual(o.__ipow__(1), (1, None))
260 self.assertEqual(o.__ipow__(2, 2), (2, 2))
261
262 def test_return_null_without_error(self):
263 # Issue #23571: A function must not return NULL without setting an
264 # error
265 if Py_DEBUG:
266 code = textwrap.dedent("""
267 import _testcapi
268 from test import support
269
270 with support.SuppressCrashReport():
271 _testcapi.return_null_without_error()
272 """)
273 rc, out, err = assert_python_failure('-c', code)
274 err = decode_stderr(err)
275 self.assertRegex(err,
276 r'Fatal Python error: _Py_CheckFunctionResult: '
277 r'a function returned NULL without setting an exception\n'
278 r'Python runtime state: initialized\n'
279 r'SystemError: <built-in function return_null_without_error> '
280 r'returned NULL without setting an exception\n'
281 r'\n'
282 r'Current thread.*:\n'
283 r' File .*", line 6 in <module>\n')
284 else:
285 with self.assertRaises(SystemError) as cm:
286 _testcapi.return_null_without_error()
287 self.assertRegex(str(cm.exception),
288 'return_null_without_error.* '
289 'returned NULL without setting an exception')
290
291 def test_return_result_with_error(self):
292 # Issue #23571: A function must not return a result with an error set
293 if Py_DEBUG:
294 code = textwrap.dedent("""
295 import _testcapi
296 from test import support
297
298 with support.SuppressCrashReport():
299 _testcapi.return_result_with_error()
300 """)
301 rc, out, err = assert_python_failure('-c', code)
302 err = decode_stderr(err)
303 self.assertRegex(err,
304 r'Fatal Python error: _Py_CheckFunctionResult: '
305 r'a function returned a result with an exception set\n'
306 r'Python runtime state: initialized\n'
307 r'ValueError\n'
308 r'\n'
309 r'The above exception was the direct cause '
310 r'of the following exception:\n'
311 r'\n'
312 r'SystemError: <built-in '
313 r'function return_result_with_error> '
314 r'returned a result with an exception set\n'
315 r'\n'
316 r'Current thread.*:\n'
317 r' File .*, line 6 in <module>\n')
318 else:
319 with self.assertRaises(SystemError) as cm:
320 _testcapi.return_result_with_error()
321 self.assertRegex(str(cm.exception),
322 'return_result_with_error.* '
323 'returned a result with an exception set')
324
325 def test_getitem_with_error(self):
326 # Test _Py_CheckSlotResult(). Raise an exception and then calls
327 # PyObject_GetItem(): check that the assertion catches the bug.
328 # PyObject_GetItem() must not be called with an exception set.
329 code = textwrap.dedent("""
330 import _testcapi
331 from test import support
332
333 with support.SuppressCrashReport():
334 _testcapi.getitem_with_error({1: 2}, 1)
335 """)
336 rc, out, err = assert_python_failure('-c', code)
337 err = decode_stderr(err)
338 if 'SystemError: ' not in err:
339 self.assertRegex(err,
340 r'Fatal Python error: _Py_CheckSlotResult: '
341 r'Slot __getitem__ of type dict succeeded '
342 r'with an exception set\n'
343 r'Python runtime state: initialized\n'
344 r'ValueError: bug\n'
345 r'\n'
346 r'Current thread .* \(most recent call first\):\n'
347 r' File .*, line 6 in <module>\n'
348 r'\n'
349 r'Extension modules: _testcapi \(total: 1\)\n')
350 else:
351 # Python built with NDEBUG macro defined:
352 # test _Py_CheckFunctionResult() instead.
353 self.assertIn('returned a result with an exception set', err)
354
355 def test_buildvalue_N(self):
356 _testcapi.test_buildvalue_N()
357
358 def test_set_nomemory(self):
359 code = """if 1:
360 import _testcapi
361
362 class C(): pass
363
364 # The first loop tests both functions and that remove_mem_hooks()
365 # can be called twice in a row. The second loop checks a call to
366 # set_nomemory() after a call to remove_mem_hooks(). The third
367 # loop checks the start and stop arguments of set_nomemory().
368 for outer_cnt in range(1, 4):
369 start = 10 * outer_cnt
370 for j in range(100):
371 if j == 0:
372 if outer_cnt != 3:
373 _testcapi.set_nomemory(start)
374 else:
375 _testcapi.set_nomemory(start, start + 1)
376 try:
377 C()
378 except MemoryError as e:
379 if outer_cnt != 3:
380 _testcapi.remove_mem_hooks()
381 print('MemoryError', outer_cnt, j)
382 _testcapi.remove_mem_hooks()
383 break
384 """
385 rc, out, err = assert_python_ok('-c', code)
386 lines = out.splitlines()
387 for i, line in enumerate(lines, 1):
388 self.assertIn(b'MemoryError', out)
389 *_, count = line.split(b' ')
390 count = int(count)
391 self.assertLessEqual(count, i*5)
392 self.assertGreaterEqual(count, i*5-2)
393
394 def test_mapping_keys_values_items(self):
395 class ESC[4;38;5;81mMapping1(ESC[4;38;5;149mdict):
396 def keys(self):
397 return list(super().keys())
398 def values(self):
399 return list(super().values())
400 def items(self):
401 return list(super().items())
402 class ESC[4;38;5;81mMapping2(ESC[4;38;5;149mdict):
403 def keys(self):
404 return tuple(super().keys())
405 def values(self):
406 return tuple(super().values())
407 def items(self):
408 return tuple(super().items())
409 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3}
410
411 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(),
412 dict_obj, OrderedDict(dict_obj),
413 Mapping1(dict_obj), Mapping2(dict_obj)]:
414 self.assertListEqual(_testcapi.get_mapping_keys(mapping),
415 list(mapping.keys()))
416 self.assertListEqual(_testcapi.get_mapping_values(mapping),
417 list(mapping.values()))
418 self.assertListEqual(_testcapi.get_mapping_items(mapping),
419 list(mapping.items()))
420
421 def test_mapping_keys_values_items_bad_arg(self):
422 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None)
423 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None)
424 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None)
425
426 class ESC[4;38;5;81mBadMapping:
427 def keys(self):
428 return None
429 def values(self):
430 return None
431 def items(self):
432 return None
433 bad_mapping = BadMapping()
434 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping)
435 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping)
436 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping)
437
438 def test_mapping_has_key(self):
439 dct = {'a': 1}
440 self.assertTrue(_testcapi.mapping_has_key(dct, 'a'))
441 self.assertFalse(_testcapi.mapping_has_key(dct, 'b'))
442
443 class ESC[4;38;5;81mSubDict(ESC[4;38;5;149mdict):
444 pass
445
446 dct2 = SubDict({'a': 1})
447 self.assertTrue(_testcapi.mapping_has_key(dct2, 'a'))
448 self.assertFalse(_testcapi.mapping_has_key(dct2, 'b'))
449
450 def test_sequence_set_slice(self):
451 # Correct case:
452 data = [1, 2, 3, 4, 5]
453 data_copy = data.copy()
454
455 _testcapi.sequence_set_slice(data, 1, 3, [8, 9])
456 data_copy[1:3] = [8, 9]
457 self.assertEqual(data, data_copy)
458 self.assertEqual(data, [1, 8, 9, 4, 5])
459
460 # Custom class:
461 class ESC[4;38;5;81mCustom:
462 def __setitem__(self, index, value):
463 self.index = index
464 self.value = value
465
466 c = Custom()
467 _testcapi.sequence_set_slice(c, 0, 5, 'abc')
468 self.assertEqual(c.index, slice(0, 5))
469 self.assertEqual(c.value, 'abc')
470
471 # Immutable sequences must raise:
472 bad_seq1 = (1, 2, 3, 4)
473 with self.assertRaises(TypeError):
474 _testcapi.sequence_set_slice(bad_seq1, 1, 3, (8, 9))
475 self.assertEqual(bad_seq1, (1, 2, 3, 4))
476
477 bad_seq2 = 'abcd'
478 with self.assertRaises(TypeError):
479 _testcapi.sequence_set_slice(bad_seq2, 1, 3, 'xy')
480 self.assertEqual(bad_seq2, 'abcd')
481
482 # Not a sequence:
483 with self.assertRaises(TypeError):
484 _testcapi.sequence_set_slice(None, 1, 3, 'xy')
485
486 mapping = {1: 'a', 2: 'b', 3: 'c'}
487 with self.assertRaises(TypeError):
488 _testcapi.sequence_set_slice(mapping, 1, 3, 'xy')
489 self.assertEqual(mapping, {1: 'a', 2: 'b', 3: 'c'})
490
491 def test_sequence_del_slice(self):
492 # Correct case:
493 data = [1, 2, 3, 4, 5]
494 data_copy = data.copy()
495
496 _testcapi.sequence_del_slice(data, 1, 3)
497 del data_copy[1:3]
498 self.assertEqual(data, data_copy)
499 self.assertEqual(data, [1, 4, 5])
500
501 # Custom class:
502 class ESC[4;38;5;81mCustom:
503 def __delitem__(self, index):
504 self.index = index
505
506 c = Custom()
507 _testcapi.sequence_del_slice(c, 0, 5)
508 self.assertEqual(c.index, slice(0, 5))
509
510 # Immutable sequences must raise:
511 bad_seq1 = (1, 2, 3, 4)
512 with self.assertRaises(TypeError):
513 _testcapi.sequence_del_slice(bad_seq1, 1, 3)
514 self.assertEqual(bad_seq1, (1, 2, 3, 4))
515
516 bad_seq2 = 'abcd'
517 with self.assertRaises(TypeError):
518 _testcapi.sequence_del_slice(bad_seq2, 1, 3)
519 self.assertEqual(bad_seq2, 'abcd')
520
521 # Not a sequence:
522 with self.assertRaises(TypeError):
523 _testcapi.sequence_del_slice(None, 1, 3)
524
525 mapping = {1: 'a', 2: 'b', 3: 'c'}
526 with self.assertRaises(TypeError):
527 _testcapi.sequence_del_slice(mapping, 1, 3)
528 self.assertEqual(mapping, {1: 'a', 2: 'b', 3: 'c'})
529
530 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'),
531 'need _testcapi.negative_refcount')
532 def test_negative_refcount(self):
533 # bpo-35059: Check that Py_DECREF() reports the correct filename
534 # when calling _Py_NegativeRefcount() to abort Python.
535 code = textwrap.dedent("""
536 import _testcapi
537 from test import support
538
539 with support.SuppressCrashReport():
540 _testcapi.negative_refcount()
541 """)
542 rc, out, err = assert_python_failure('-c', code)
543 self.assertRegex(err,
544 br'_testcapimodule\.c:[0-9]+: '
545 br'_Py_NegativeRefcount: Assertion failed: '
546 br'object has negative ref count')
547
548 def test_trashcan_subclass(self):
549 # bpo-35983: Check that the trashcan mechanism for "list" is NOT
550 # activated when its tp_dealloc is being called by a subclass
551 from _testcapi import MyList
552 L = None
553 for i in range(1000):
554 L = MyList((L,))
555
556 @support.requires_resource('cpu')
557 def test_trashcan_python_class1(self):
558 self.do_test_trashcan_python_class(list)
559
560 @support.requires_resource('cpu')
561 def test_trashcan_python_class2(self):
562 from _testcapi import MyList
563 self.do_test_trashcan_python_class(MyList)
564
565 def do_test_trashcan_python_class(self, base):
566 # Check that the trashcan mechanism works properly for a Python
567 # subclass of a class using the trashcan (this specific test assumes
568 # that the base class "base" behaves like list)
569 class ESC[4;38;5;81mPyList(ESC[4;38;5;149mbase):
570 # Count the number of PyList instances to verify that there is
571 # no memory leak
572 num = 0
573 def __init__(self, *args):
574 __class__.num += 1
575 super().__init__(*args)
576 def __del__(self):
577 __class__.num -= 1
578
579 for parity in (0, 1):
580 L = None
581 # We need in the order of 2**20 iterations here such that a
582 # typical 8MB stack would overflow without the trashcan.
583 for i in range(2**20):
584 L = PyList((L,))
585 L.attr = i
586 if parity:
587 # Add one additional nesting layer
588 L = (L,)
589 self.assertGreater(PyList.num, 0)
590 del L
591 self.assertEqual(PyList.num, 0)
592
593 def test_heap_ctype_doc_and_text_signature(self):
594 self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc")
595 self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)")
596
597 def test_null_type_doc(self):
598 self.assertEqual(_testcapi.NullTpDocType.__doc__, None)
599
600 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self):
601 class ESC[4;38;5;81mHeapGcCTypeSubclass(ESC[4;38;5;149m_testcapiESC[4;38;5;149m.ESC[4;38;5;149mHeapGcCType):
602 def __init__(self):
603 self.value2 = 20
604 super().__init__()
605
606 subclass_instance = HeapGcCTypeSubclass()
607 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass)
608
609 # Test that subclass instance was fully created
610 self.assertEqual(subclass_instance.value, 10)
611 self.assertEqual(subclass_instance.value2, 20)
612
613 # Test that the type reference count is only decremented once
614 del subclass_instance
615 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass))
616
617 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
618 class ESC[4;38;5;81mA(ESC[4;38;5;149m_testcapiESC[4;38;5;149m.ESC[4;38;5;149mHeapGcCType):
619 def __init__(self):
620 self.value2 = 20
621 super().__init__()
622
623 class ESC[4;38;5;81mB(ESC[4;38;5;149mA):
624 def __init__(self):
625 super().__init__()
626
627 def __del__(self):
628 self.__class__ = A
629 A.refcnt_in_del = sys.getrefcount(A)
630 B.refcnt_in_del = sys.getrefcount(B)
631
632 subclass_instance = B()
633 type_refcnt = sys.getrefcount(B)
634 new_type_refcnt = sys.getrefcount(A)
635
636 # Test that subclass instance was fully created
637 self.assertEqual(subclass_instance.value, 10)
638 self.assertEqual(subclass_instance.value2, 20)
639
640 del subclass_instance
641
642 # Test that setting __class__ modified the reference counts of the types
643 if Py_DEBUG:
644 # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference
645 # to the type while calling tp_dealloc()
646 self.assertEqual(type_refcnt, B.refcnt_in_del)
647 else:
648 self.assertEqual(type_refcnt - 1, B.refcnt_in_del)
649 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del)
650
651 # Test that the original type already has decreased its refcnt
652 self.assertEqual(type_refcnt - 1, sys.getrefcount(B))
653
654 # Test that subtype_dealloc decref the newly assigned __class__ only once
655 self.assertEqual(new_type_refcnt, sys.getrefcount(A))
656
657 def test_heaptype_with_dict(self):
658 inst = _testcapi.HeapCTypeWithDict()
659 inst.foo = 42
660 self.assertEqual(inst.foo, 42)
661 self.assertEqual(inst.dictobj, inst.__dict__)
662 self.assertEqual(inst.dictobj, {"foo": 42})
663
664 inst = _testcapi.HeapCTypeWithDict()
665 self.assertEqual({}, inst.__dict__)
666
667 def test_heaptype_with_negative_dict(self):
668 inst = _testcapi.HeapCTypeWithNegativeDict()
669 inst.foo = 42
670 self.assertEqual(inst.foo, 42)
671 self.assertEqual(inst.dictobj, inst.__dict__)
672 self.assertEqual(inst.dictobj, {"foo": 42})
673
674 inst = _testcapi.HeapCTypeWithNegativeDict()
675 self.assertEqual({}, inst.__dict__)
676
677 def test_heaptype_with_weakref(self):
678 inst = _testcapi.HeapCTypeWithWeakref()
679 ref = weakref.ref(inst)
680 self.assertEqual(ref(), inst)
681 self.assertEqual(inst.weakreflist, ref)
682
683 def test_heaptype_with_buffer(self):
684 inst = _testcapi.HeapCTypeWithBuffer()
685 b = bytes(inst)
686 self.assertEqual(b, b"1234")
687
688 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self):
689 subclass_instance = _testcapi.HeapCTypeSubclass()
690 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
691
692 # Test that subclass instance was fully created
693 self.assertEqual(subclass_instance.value, 10)
694 self.assertEqual(subclass_instance.value2, 20)
695
696 # Test that the type reference count is only decremented once
697 del subclass_instance
698 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass))
699
700 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
701 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer()
702 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)
703 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
704
705 # Test that subclass instance was fully created
706 self.assertEqual(subclass_instance.value, 10)
707 self.assertEqual(subclass_instance.value2, 20)
708
709 # The tp_finalize slot will set __class__ to HeapCTypeSubclass
710 del subclass_instance
711
712 # Test that setting __class__ modified the reference counts of the types
713 if Py_DEBUG:
714 # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference
715 # to the type while calling tp_dealloc()
716 self.assertEqual(type_refcnt, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
717 else:
718 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
719 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del)
720
721 # Test that the original type already has decreased its refcnt
722 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer))
723
724 # Test that subtype_dealloc decref the newly assigned __class__ only once
725 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass))
726
727 def test_heaptype_with_setattro(self):
728 obj = _testcapi.HeapCTypeSetattr()
729 self.assertEqual(obj.pvalue, 10)
730 obj.value = 12
731 self.assertEqual(obj.pvalue, 12)
732 del obj.value
733 self.assertEqual(obj.pvalue, 0)
734
735 def test_multiple_inheritance_ctypes_with_weakref_or_dict(self):
736
737 class ESC[4;38;5;81mBoth1(ESC[4;38;5;149m_testcapiESC[4;38;5;149m.ESC[4;38;5;149mHeapCTypeWithWeakref, ESC[4;38;5;149m_testcapiESC[4;38;5;149m.ESC[4;38;5;149mHeapCTypeWithDict):
738 pass
739 class ESC[4;38;5;81mBoth2(ESC[4;38;5;149m_testcapiESC[4;38;5;149m.ESC[4;38;5;149mHeapCTypeWithDict, ESC[4;38;5;149m_testcapiESC[4;38;5;149m.ESC[4;38;5;149mHeapCTypeWithWeakref):
740 pass
741
742 for cls in (_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithDict2,
743 _testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithWeakref2):
744 for cls2 in (_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithDict2,
745 _testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithWeakref2):
746 if cls is not cls2:
747 class ESC[4;38;5;81mS(ESC[4;38;5;149mcls, ESC[4;38;5;149mcls2):
748 pass
749 class ESC[4;38;5;81mB1(ESC[4;38;5;149mBoth1, ESC[4;38;5;149mcls):
750 pass
751 class ESC[4;38;5;81mB2(ESC[4;38;5;149mBoth1, ESC[4;38;5;149mcls):
752 pass
753
754 def test_pynumber_tobase(self):
755 from _testcapi import pynumber_tobase
756 self.assertEqual(pynumber_tobase(123, 2), '0b1111011')
757 self.assertEqual(pynumber_tobase(123, 8), '0o173')
758 self.assertEqual(pynumber_tobase(123, 10), '123')
759 self.assertEqual(pynumber_tobase(123, 16), '0x7b')
760 self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011')
761 self.assertEqual(pynumber_tobase(-123, 8), '-0o173')
762 self.assertEqual(pynumber_tobase(-123, 10), '-123')
763 self.assertEqual(pynumber_tobase(-123, 16), '-0x7b')
764 self.assertRaises(TypeError, pynumber_tobase, 123.0, 10)
765 self.assertRaises(TypeError, pynumber_tobase, '123', 10)
766 self.assertRaises(SystemError, pynumber_tobase, 123, 0)
767
768 def check_fatal_error(self, code, expected, not_expected=()):
769 with support.SuppressCrashReport():
770 rc, out, err = assert_python_failure('-sSI', '-c', code)
771
772 err = decode_stderr(err)
773 self.assertIn('Fatal Python error: test_fatal_error: MESSAGE\n',
774 err)
775
776 match = re.search(r'^Extension modules:(.*) \(total: ([0-9]+)\)$',
777 err, re.MULTILINE)
778 if not match:
779 self.fail(f"Cannot find 'Extension modules:' in {err!r}")
780 modules = set(match.group(1).strip().split(', '))
781 total = int(match.group(2))
782
783 for name in expected:
784 self.assertIn(name, modules)
785 for name in not_expected:
786 self.assertNotIn(name, modules)
787 self.assertEqual(len(modules), total)
788
789 @support.requires_subprocess()
790 def test_fatal_error(self):
791 # By default, stdlib extension modules are ignored,
792 # but not test modules.
793 expected = ('_testcapi',)
794 not_expected = ('sys',)
795 code = 'import _testcapi, sys; _testcapi.fatal_error(b"MESSAGE")'
796 self.check_fatal_error(code, expected, not_expected)
797
798 # Mark _testcapi as stdlib module, but not sys
799 expected = ('sys',)
800 not_expected = ('_testcapi',)
801 code = textwrap.dedent('''
802 import _testcapi, sys
803 sys.stdlib_module_names = frozenset({"_testcapi"})
804 _testcapi.fatal_error(b"MESSAGE")
805 ''')
806 self.check_fatal_error(code, expected)
807
808 def test_pyobject_repr_from_null(self):
809 s = _testcapi.pyobject_repr_from_null()
810 self.assertEqual(s, '<NULL>')
811
812 def test_pyobject_str_from_null(self):
813 s = _testcapi.pyobject_str_from_null()
814 self.assertEqual(s, '<NULL>')
815
816 def test_pyobject_bytes_from_null(self):
817 s = _testcapi.pyobject_bytes_from_null()
818 self.assertEqual(s, b'<NULL>')
819
820 def test_Py_CompileString(self):
821 # Check that Py_CompileString respects the coding cookie
822 _compile = _testcapi.Py_CompileString
823 code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n"
824 result = _compile(code)
825 expected = compile(code, "<string>", "exec")
826 self.assertEqual(result.co_consts, expected.co_consts)
827
828 def test_export_symbols(self):
829 # bpo-44133: Ensure that the "Py_FrozenMain" and
830 # "PyThread_get_thread_native_id" symbols are exported by the Python
831 # (directly by the binary, or via by the Python dynamic library).
832 ctypes = import_helper.import_module('ctypes')
833 names = []
834
835 # Test if the PY_HAVE_THREAD_NATIVE_ID macro is defined
836 if hasattr(_thread, 'get_native_id'):
837 names.append('PyThread_get_thread_native_id')
838
839 # Python/frozenmain.c fails to build on Windows when the symbols are
840 # missing:
841 # - PyWinFreeze_ExeInit
842 # - PyWinFreeze_ExeTerm
843 # - PyInitFrozenExtensions
844 if os.name != 'nt':
845 names.append('Py_FrozenMain')
846
847 for name in names:
848 with self.subTest(name=name):
849 self.assertTrue(hasattr(ctypes.pythonapi, name))
850
851 def test_eval_get_func_name(self):
852 def function_example(): ...
853
854 class ESC[4;38;5;81mA:
855 def method_example(self): ...
856
857 self.assertEqual(_testcapi.eval_get_func_name(function_example),
858 "function_example")
859 self.assertEqual(_testcapi.eval_get_func_name(A.method_example),
860 "method_example")
861 self.assertEqual(_testcapi.eval_get_func_name(A().method_example),
862 "method_example")
863 self.assertEqual(_testcapi.eval_get_func_name(sum), "sum") # c function
864 self.assertEqual(_testcapi.eval_get_func_name(A), "type")
865
866 def test_eval_get_func_desc(self):
867 def function_example(): ...
868
869 class ESC[4;38;5;81mA:
870 def method_example(self): ...
871
872 self.assertEqual(_testcapi.eval_get_func_desc(function_example),
873 "()")
874 self.assertEqual(_testcapi.eval_get_func_desc(A.method_example),
875 "()")
876 self.assertEqual(_testcapi.eval_get_func_desc(A().method_example),
877 "()")
878 self.assertEqual(_testcapi.eval_get_func_desc(sum), "()") # c function
879 self.assertEqual(_testcapi.eval_get_func_desc(A), " object")
880
881 def test_function_get_code(self):
882 import types
883
884 def some():
885 pass
886
887 code = _testcapi.function_get_code(some)
888 self.assertIsInstance(code, types.CodeType)
889 self.assertEqual(code, some.__code__)
890
891 with self.assertRaises(SystemError):
892 _testcapi.function_get_code(None) # not a function
893
894 def test_function_get_globals(self):
895 def some():
896 pass
897
898 globals_ = _testcapi.function_get_globals(some)
899 self.assertIsInstance(globals_, dict)
900 self.assertEqual(globals_, some.__globals__)
901
902 with self.assertRaises(SystemError):
903 _testcapi.function_get_globals(None) # not a function
904
905 def test_function_get_module(self):
906 def some():
907 pass
908
909 module = _testcapi.function_get_module(some)
910 self.assertIsInstance(module, str)
911 self.assertEqual(module, some.__module__)
912
913 with self.assertRaises(SystemError):
914 _testcapi.function_get_module(None) # not a function
915
916 def test_sys_getobject(self):
917 getobject = _testcapi.sys_getobject
918
919 self.assertIs(getobject(b'stdout'), sys.stdout)
920 with support.swap_attr(sys, '\U0001f40d', 42):
921 self.assertEqual(getobject('\U0001f40d'.encode()), 42)
922
923 self.assertIs(getobject(b'nonexisting'), AttributeError)
924 self.assertIs(getobject(b'\xff'), AttributeError)
925 # CRASHES getobject(NULL)
926
927 def test_sys_setobject(self):
928 setobject = _testcapi.sys_setobject
929
930 value = ['value']
931 value2 = ['value2']
932 try:
933 self.assertEqual(setobject(b'newattr', value), 0)
934 self.assertIs(sys.newattr, value)
935 self.assertEqual(setobject(b'newattr', value2), 0)
936 self.assertIs(sys.newattr, value2)
937 self.assertEqual(setobject(b'newattr', NULL), 0)
938 self.assertFalse(hasattr(sys, 'newattr'))
939 self.assertEqual(setobject(b'newattr', NULL), 0)
940 finally:
941 with contextlib.suppress(AttributeError):
942 del sys.newattr
943 try:
944 self.assertEqual(setobject('\U0001f40d'.encode(), value), 0)
945 self.assertIs(getattr(sys, '\U0001f40d'), value)
946 self.assertEqual(setobject('\U0001f40d'.encode(), NULL), 0)
947 self.assertFalse(hasattr(sys, '\U0001f40d'))
948 finally:
949 with contextlib.suppress(AttributeError):
950 delattr(sys, '\U0001f40d')
951
952 with self.assertRaises(UnicodeDecodeError):
953 setobject(b'\xff', value)
954 # CRASHES setobject(NULL, value)
955
956
957 class ESC[4;38;5;81mTestPendingCalls(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
958
959 def pendingcalls_submit(self, l, n):
960 def callback():
961 #this function can be interrupted by thread switching so let's
962 #use an atomic operation
963 l.append(None)
964
965 for i in range(n):
966 time.sleep(random.random()*0.02) #0.01 secs on average
967 #try submitting callback until successful.
968 #rely on regular interrupt to flush queue if we are
969 #unsuccessful.
970 while True:
971 if _testcapi._pending_threadfunc(callback):
972 break
973
974 def pendingcalls_wait(self, l, n, context = None):
975 #now, stick around until l[0] has grown to 10
976 count = 0
977 while len(l) != n:
978 #this busy loop is where we expect to be interrupted to
979 #run our callbacks. Note that callbacks are only run on the
980 #main thread
981 if False and support.verbose:
982 print("(%i)"%(len(l),),)
983 for i in range(1000):
984 a = i*i
985 if context and not context.event.is_set():
986 continue
987 count += 1
988 self.assertTrue(count < 10000,
989 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
990 if False and support.verbose:
991 print("(%i)"%(len(l),))
992
993 @threading_helper.requires_working_threading()
994 def test_pendingcalls_threaded(self):
995
996 #do every callback on a separate thread
997 n = 32 #total callbacks
998 threads = []
999 class ESC[4;38;5;81mfoo(ESC[4;38;5;149mobject):pass
1000 context = foo()
1001 context.l = []
1002 context.n = 2 #submits per thread
1003 context.nThreads = n // context.n
1004 context.nFinished = 0
1005 context.lock = threading.Lock()
1006 context.event = threading.Event()
1007
1008 threads = [threading.Thread(target=self.pendingcalls_thread,
1009 args=(context,))
1010 for i in range(context.nThreads)]
1011 with threading_helper.start_threads(threads):
1012 self.pendingcalls_wait(context.l, n, context)
1013
1014 def pendingcalls_thread(self, context):
1015 try:
1016 self.pendingcalls_submit(context.l, context.n)
1017 finally:
1018 with context.lock:
1019 context.nFinished += 1
1020 nFinished = context.nFinished
1021 if False and support.verbose:
1022 print("finished threads: ", nFinished)
1023 if nFinished == context.nThreads:
1024 context.event.set()
1025
1026 def test_pendingcalls_non_threaded(self):
1027 #again, just using the main thread, likely they will all be dispatched at
1028 #once. It is ok to ask for too many, because we loop until we find a slot.
1029 #the loop can be interrupted to dispatch.
1030 #there are only 32 dispatch slots, so we go for twice that!
1031 l = []
1032 n = 64
1033 self.pendingcalls_submit(l, n)
1034 self.pendingcalls_wait(l, n)
1035
1036
1037 class ESC[4;38;5;81mSubinterpreterTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1038
1039 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
1040 def test_subinterps(self):
1041 import builtins
1042 r, w = os.pipe()
1043 code = """if 1:
1044 import sys, builtins, pickle
1045 with open({:d}, "wb") as f:
1046 pickle.dump(id(sys.modules), f)
1047 pickle.dump(id(builtins), f)
1048 """.format(w)
1049 with open(r, "rb") as f:
1050 ret = support.run_in_subinterp(code)
1051 self.assertEqual(ret, 0)
1052 self.assertNotEqual(pickle.load(f), id(sys.modules))
1053 self.assertNotEqual(pickle.load(f), id(builtins))
1054
1055 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
1056 def test_subinterps_recent_language_features(self):
1057 r, w = os.pipe()
1058 code = """if 1:
1059 import pickle
1060 with open({:d}, "wb") as f:
1061
1062 @(lambda x:x) # Py 3.9
1063 def noop(x): return x
1064
1065 a = (b := f'1{{2}}3') + noop('x') # Py 3.8 (:=) / 3.6 (f'')
1066
1067 async def foo(arg): return await arg # Py 3.5
1068
1069 pickle.dump(dict(a=a, b=b), f)
1070 """.format(w)
1071
1072 with open(r, "rb") as f:
1073 ret = support.run_in_subinterp(code)
1074 self.assertEqual(ret, 0)
1075 self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'})
1076
1077 def test_mutate_exception(self):
1078 """
1079 Exceptions saved in global module state get shared between
1080 individual module instances. This test checks whether or not
1081 a change in one interpreter's module gets reflected into the
1082 other ones.
1083 """
1084 import binascii
1085
1086 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'")
1087
1088 self.assertFalse(hasattr(binascii.Error, "foobar"))
1089
1090 @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
1091 def test_module_state_shared_in_global(self):
1092 """
1093 bpo-44050: Extension module state should be shared between interpreters
1094 when it doesn't support sub-interpreters.
1095 """
1096 r, w = os.pipe()
1097 self.addCleanup(os.close, r)
1098 self.addCleanup(os.close, w)
1099
1100 script = textwrap.dedent(f"""
1101 import importlib.machinery
1102 import importlib.util
1103 import os
1104
1105 fullname = '_test_module_state_shared'
1106 origin = importlib.util.find_spec('_testmultiphase').origin
1107 loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
1108 spec = importlib.util.spec_from_loader(fullname, loader)
1109 module = importlib.util.module_from_spec(spec)
1110 attr_id = str(id(module.Error)).encode()
1111
1112 os.write({w}, attr_id)
1113 """)
1114 exec(script)
1115 main_attr_id = os.read(r, 100)
1116
1117 ret = support.run_in_subinterp(script)
1118 self.assertEqual(ret, 0)
1119 subinterp_attr_id = os.read(r, 100)
1120 self.assertEqual(main_attr_id, subinterp_attr_id)
1121
1122
1123 class ESC[4;38;5;81mTestThreadState(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1124
1125 @threading_helper.reap_threads
1126 @threading_helper.requires_working_threading()
1127 def test_thread_state(self):
1128 # some extra thread-state tests driven via _testcapi
1129 def target():
1130 idents = []
1131
1132 def callback():
1133 idents.append(threading.get_ident())
1134
1135 _testcapi._test_thread_state(callback)
1136 a = b = callback
1137 time.sleep(1)
1138 # Check our main thread is in the list exactly 3 times.
1139 self.assertEqual(idents.count(threading.get_ident()), 3,
1140 "Couldn't find main thread correctly in the list")
1141
1142 target()
1143 t = threading.Thread(target=target)
1144 t.start()
1145 t.join()
1146
1147 @threading_helper.reap_threads
1148 @threading_helper.requires_working_threading()
1149 def test_gilstate_ensure_no_deadlock(self):
1150 # See https://github.com/python/cpython/issues/96071
1151 code = textwrap.dedent(f"""
1152 import _testcapi
1153
1154 def callback():
1155 print('callback called')
1156
1157 _testcapi._test_thread_state(callback)
1158 """)
1159 ret = assert_python_ok('-X', 'tracemalloc', '-c', code)
1160 self.assertIn(b'callback called', ret.out)
1161
1162
1163 class ESC[4;38;5;81mTest_testcapi(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1164 locals().update((name, getattr(_testcapi, name))
1165 for name in dir(_testcapi)
1166 if name.startswith('test_') and not name.endswith('_code'))
1167
1168 # Suppress warning from PyUnicode_FromUnicode().
1169 @warnings_helper.ignore_warnings(category=DeprecationWarning)
1170 def test_widechar(self):
1171 _testcapi.test_widechar()
1172
1173 def test_version_api_data(self):
1174 self.assertEqual(_testcapi.Py_Version, sys.hexversion)
1175
1176
1177 class ESC[4;38;5;81mTest_testinternalcapi(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1178 locals().update((name, getattr(_testinternalcapi, name))
1179 for name in dir(_testinternalcapi)
1180 if name.startswith('test_'))
1181
1182
1183 @support.requires_subprocess()
1184 class ESC[4;38;5;81mPyMemDebugTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1185 PYTHONMALLOC = 'debug'
1186 # '0x04c06e0' or '04C06E0'
1187 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+'
1188
1189 def check(self, code):
1190 with support.SuppressCrashReport():
1191 out = assert_python_failure(
1192 '-c', code,
1193 PYTHONMALLOC=self.PYTHONMALLOC,
1194 # FreeBSD: instruct jemalloc to not fill freed() memory
1195 # with junk byte 0x5a, see JEMALLOC(3)
1196 MALLOC_CONF="junk:false",
1197 )
1198 stderr = out.err
1199 return stderr.decode('ascii', 'replace')
1200
1201 def test_buffer_overflow(self):
1202 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()')
1203 regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
1204 r" 16 bytes originally requested\n"
1205 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
1206 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n"
1207 r" at tail\+0: 0x78 \*\*\* OUCH\n"
1208 r" at tail\+1: 0xfd\n"
1209 r" at tail\+2: 0xfd\n"
1210 r" .*\n"
1211 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
1212 r" Data at p: cd cd cd .*\n"
1213 r"\n"
1214 r"Enable tracemalloc to get the memory block allocation traceback\n"
1215 r"\n"
1216 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte")
1217 regex = regex.format(ptr=self.PTR_REGEX)
1218 regex = re.compile(regex, flags=re.DOTALL)
1219 self.assertRegex(out, regex)
1220
1221 def test_api_misuse(self):
1222 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()')
1223 regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
1224 r" 16 bytes originally requested\n"
1225 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
1226 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n"
1227 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
1228 r" Data at p: cd cd cd .*\n"
1229 r"\n"
1230 r"Enable tracemalloc to get the memory block allocation traceback\n"
1231 r"\n"
1232 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n")
1233 regex = regex.format(ptr=self.PTR_REGEX)
1234 self.assertRegex(out, regex)
1235
1236 def check_malloc_without_gil(self, code):
1237 out = self.check(code)
1238 expected = ('Fatal Python error: _PyMem_DebugMalloc: '
1239 'Python memory allocator called without holding the GIL')
1240 self.assertIn(expected, out)
1241
1242 def test_pymem_malloc_without_gil(self):
1243 # Debug hooks must raise an error if PyMem_Malloc() is called
1244 # without holding the GIL
1245 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()'
1246 self.check_malloc_without_gil(code)
1247
1248 def test_pyobject_malloc_without_gil(self):
1249 # Debug hooks must raise an error if PyObject_Malloc() is called
1250 # without holding the GIL
1251 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()'
1252 self.check_malloc_without_gil(code)
1253
1254 def check_pyobject_is_freed(self, func_name):
1255 code = textwrap.dedent(f'''
1256 import gc, os, sys, _testcapi
1257 # Disable the GC to avoid crash on GC collection
1258 gc.disable()
1259 try:
1260 _testcapi.{func_name}()
1261 # Exit immediately to avoid a crash while deallocating
1262 # the invalid object
1263 os._exit(0)
1264 except _testcapi.error:
1265 os._exit(1)
1266 ''')
1267 assert_python_ok(
1268 '-c', code,
1269 PYTHONMALLOC=self.PYTHONMALLOC,
1270 MALLOC_CONF="junk:false",
1271 )
1272
1273 def test_pyobject_null_is_freed(self):
1274 self.check_pyobject_is_freed('check_pyobject_null_is_freed')
1275
1276 def test_pyobject_uninitialized_is_freed(self):
1277 self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed')
1278
1279 def test_pyobject_forbidden_bytes_is_freed(self):
1280 self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed')
1281
1282 def test_pyobject_freed_is_freed(self):
1283 self.check_pyobject_is_freed('check_pyobject_freed_is_freed')
1284
1285
1286 class ESC[4;38;5;81mPyMemMallocDebugTests(ESC[4;38;5;149mPyMemDebugTests):
1287 PYTHONMALLOC = 'malloc_debug'
1288
1289
1290 @unittest.skipUnless(support.with_pymalloc(), 'need pymalloc')
1291 class ESC[4;38;5;81mPyMemPymallocDebugTests(ESC[4;38;5;149mPyMemDebugTests):
1292 PYTHONMALLOC = 'pymalloc_debug'
1293
1294
1295 @unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG')
1296 class ESC[4;38;5;81mPyMemDefaultTests(ESC[4;38;5;149mPyMemDebugTests):
1297 # test default allocator of Python compiled in debug mode
1298 PYTHONMALLOC = ''
1299
1300
1301 @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
1302 class ESC[4;38;5;81mTest_ModuleStateAccess(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1303 """Test access to module start (PEP 573)"""
1304
1305 # The C part of the tests lives in _testmultiphase, in a module called
1306 # _testmultiphase_meth_state_access.
1307 # This module has multi-phase initialization, unlike _testcapi.
1308
1309 def setUp(self):
1310 fullname = '_testmultiphase_meth_state_access' # XXX
1311 origin = importlib.util.find_spec('_testmultiphase').origin
1312 loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
1313 spec = importlib.util.spec_from_loader(fullname, loader)
1314 module = importlib.util.module_from_spec(spec)
1315 loader.exec_module(module)
1316 self.module = module
1317
1318 def test_subclass_get_module(self):
1319 """PyType_GetModule for defining_class"""
1320 class ESC[4;38;5;81mStateAccessType_Subclass(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mmoduleESC[4;38;5;149m.ESC[4;38;5;149mStateAccessType):
1321 pass
1322
1323 instance = StateAccessType_Subclass()
1324 self.assertIs(instance.get_defining_module(), self.module)
1325
1326 def test_subclass_get_module_with_super(self):
1327 class ESC[4;38;5;81mStateAccessType_Subclass(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mmoduleESC[4;38;5;149m.ESC[4;38;5;149mStateAccessType):
1328 def get_defining_module(self):
1329 return super().get_defining_module()
1330
1331 instance = StateAccessType_Subclass()
1332 self.assertIs(instance.get_defining_module(), self.module)
1333
1334 def test_state_access(self):
1335 """Checks methods defined with and without argument clinic
1336
1337 This tests a no-arg method (get_count) and a method with
1338 both a positional and keyword argument.
1339 """
1340
1341 a = self.module.StateAccessType()
1342 b = self.module.StateAccessType()
1343
1344 methods = {
1345 'clinic': a.increment_count_clinic,
1346 'noclinic': a.increment_count_noclinic,
1347 }
1348
1349 for name, increment_count in methods.items():
1350 with self.subTest(name):
1351 self.assertEqual(a.get_count(), b.get_count())
1352 self.assertEqual(a.get_count(), 0)
1353
1354 increment_count()
1355 self.assertEqual(a.get_count(), b.get_count())
1356 self.assertEqual(a.get_count(), 1)
1357
1358 increment_count(3)
1359 self.assertEqual(a.get_count(), b.get_count())
1360 self.assertEqual(a.get_count(), 4)
1361
1362 increment_count(-2, twice=True)
1363 self.assertEqual(a.get_count(), b.get_count())
1364 self.assertEqual(a.get_count(), 0)
1365
1366 with self.assertRaises(TypeError):
1367 increment_count(thrice=3)
1368
1369 with self.assertRaises(TypeError):
1370 increment_count(1, 2, 3)
1371
1372 def test_get_module_bad_def(self):
1373 # PyType_GetModuleByDef fails gracefully if it doesn't
1374 # find what it's looking for.
1375 # see bpo-46433
1376 instance = self.module.StateAccessType()
1377 with self.assertRaises(TypeError):
1378 instance.getmodulebydef_bad_def()
1379
1380 def test_get_module_static_in_mro(self):
1381 # Here, the class PyType_GetModuleByDef is looking for
1382 # appears in the MRO after a static type (Exception).
1383 # see bpo-46433
1384 class ESC[4;38;5;81mSubclass(ESC[4;38;5;149mBaseException, ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mmoduleESC[4;38;5;149m.ESC[4;38;5;149mStateAccessType):
1385 pass
1386 self.assertIs(Subclass().get_defining_module(), self.module)
1387
1388
1389 class ESC[4;38;5;81mTest_FrameAPI(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1390
1391 def getframe(self):
1392 return sys._getframe()
1393
1394 def getgenframe(self):
1395 yield sys._getframe()
1396
1397 def test_frame_getters(self):
1398 frame = self.getframe()
1399 self.assertEqual(frame.f_locals, _testcapi.frame_getlocals(frame))
1400 self.assertIs(frame.f_globals, _testcapi.frame_getglobals(frame))
1401 self.assertIs(frame.f_builtins, _testcapi.frame_getbuiltins(frame))
1402 self.assertEqual(frame.f_lasti, _testcapi.frame_getlasti(frame))
1403
1404 def test_frame_get_generator(self):
1405 gen = self.getgenframe()
1406 frame = next(gen)
1407 self.assertIs(gen, _testcapi.frame_getgenerator(frame))
1408
1409 def test_frame_fback_api(self):
1410 """Test that accessing `f_back` does not cause a segmentation fault on
1411 a frame created with `PyFrame_New` (GH-99110)."""
1412 def dummy():
1413 pass
1414
1415 frame = _testcapi.frame_new(dummy.__code__, globals(), locals())
1416 # The following line should not cause a segmentation fault.
1417 self.assertIsNone(frame.f_back)
1418
1419
1420 SUFFICIENT_TO_DEOPT_AND_SPECIALIZE = 100
1421
1422 class ESC[4;38;5;81mTest_Pep523API(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1423
1424 def do_test(self, func, names):
1425 actual_calls = []
1426 start = SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
1427 count = start + SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
1428 try:
1429 for i in range(count):
1430 if i == start:
1431 _testinternalcapi.set_eval_frame_record(actual_calls)
1432 func()
1433 finally:
1434 _testinternalcapi.set_eval_frame_default()
1435 expected_calls = names * SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
1436 self.assertEqual(len(expected_calls), len(actual_calls))
1437 for expected, actual in zip(expected_calls, actual_calls, strict=True):
1438 self.assertEqual(expected, actual)
1439
1440 def test_inlined_binary_subscr(self):
1441 class ESC[4;38;5;81mC:
1442 def __getitem__(self, other):
1443 return None
1444 def func():
1445 C()[42]
1446 names = ["func", "__getitem__"]
1447 self.do_test(func, names)
1448
1449 def test_inlined_call(self):
1450 def inner(x=42):
1451 pass
1452 def func():
1453 inner()
1454 inner(42)
1455 names = ["func", "inner", "inner"]
1456 self.do_test(func, names)
1457
1458
1459 if __name__ == "__main__":
1460 unittest.main()