1 import contextlib
2 import os
3 import sys
4 import tracemalloc
5 import unittest
6 from unittest.mock import patch
7 from test.support.script_helper import (assert_python_ok, assert_python_failure,
8 interpreter_requires_environment)
9 from test import support
10 from test.support import os_helper
11
12 try:
13 import _testcapi
14 except ImportError:
15 _testcapi = None
16
17
18 EMPTY_STRING_SIZE = sys.getsizeof(b'')
19 INVALID_NFRAME = (-1, 2**30)
20
21
22 def get_frames(nframe, lineno_delta):
23 frames = []
24 frame = sys._getframe(1)
25 for index in range(nframe):
26 code = frame.f_code
27 lineno = frame.f_lineno + lineno_delta
28 frames.append((code.co_filename, lineno))
29 lineno_delta = 0
30 frame = frame.f_back
31 if frame is None:
32 break
33 return tuple(frames)
34
35 def allocate_bytes(size):
36 nframe = tracemalloc.get_traceback_limit()
37 bytes_len = (size - EMPTY_STRING_SIZE)
38 frames = get_frames(nframe, 1)
39 data = b'x' * bytes_len
40 return data, tracemalloc.Traceback(frames, min(len(frames), nframe))
41
42 def create_snapshots():
43 traceback_limit = 2
44
45 # _tracemalloc._get_traces() returns a list of (domain, size,
46 # traceback_frames) tuples. traceback_frames is a tuple of (filename,
47 # line_number) tuples.
48 raw_traces = [
49 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
50 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
51 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
52
53 (1, 2, (('a.py', 5), ('b.py', 4)), 3),
54
55 (2, 66, (('b.py', 1),), 1),
56
57 (3, 7, (('<unknown>', 0),), 1),
58 ]
59 snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
60
61 raw_traces2 = [
62 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
63 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
64 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
65
66 (2, 2, (('a.py', 5), ('b.py', 4)), 3),
67 (2, 5000, (('a.py', 5), ('b.py', 4)), 3),
68
69 (4, 400, (('c.py', 578),), 1),
70 ]
71 snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
72
73 return (snapshot, snapshot2)
74
75 def frame(filename, lineno):
76 return tracemalloc._Frame((filename, lineno))
77
78 def traceback(*frames):
79 return tracemalloc.Traceback(frames)
80
81 def traceback_lineno(filename, lineno):
82 return traceback((filename, lineno))
83
84 def traceback_filename(filename):
85 return traceback_lineno(filename, 0)
86
87
88 class ESC[4;38;5;81mTestTraceback(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
89 def test_repr(self):
90 def get_repr(*args) -> str:
91 return repr(tracemalloc.Traceback(*args))
92
93 self.assertEqual(get_repr(()), "<Traceback ()>")
94 self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>")
95
96 frames = (("f1", 1), ("f2", 2))
97 exp_repr_frames = (
98 "(<Frame filename='f2' lineno=2>,"
99 " <Frame filename='f1' lineno=1>)"
100 )
101 self.assertEqual(get_repr(frames),
102 f"<Traceback {exp_repr_frames}>")
103 self.assertEqual(get_repr(frames, 2),
104 f"<Traceback {exp_repr_frames} total_nframe=2>")
105
106
107 class ESC[4;38;5;81mTestTracemallocEnabled(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
108 def setUp(self):
109 if tracemalloc.is_tracing():
110 self.skipTest("tracemalloc must be stopped before the test")
111
112 tracemalloc.start(1)
113
114 def tearDown(self):
115 tracemalloc.stop()
116
117 def test_get_tracemalloc_memory(self):
118 data = [allocate_bytes(123) for count in range(1000)]
119 size = tracemalloc.get_tracemalloc_memory()
120 self.assertGreaterEqual(size, 0)
121
122 tracemalloc.clear_traces()
123 size2 = tracemalloc.get_tracemalloc_memory()
124 self.assertGreaterEqual(size2, 0)
125 self.assertLessEqual(size2, size)
126
127 def test_get_object_traceback(self):
128 tracemalloc.clear_traces()
129 obj_size = 12345
130 obj, obj_traceback = allocate_bytes(obj_size)
131 traceback = tracemalloc.get_object_traceback(obj)
132 self.assertEqual(traceback, obj_traceback)
133
134 def test_new_reference(self):
135 tracemalloc.clear_traces()
136 # gc.collect() indirectly calls PyList_ClearFreeList()
137 support.gc_collect()
138
139 # Create a list and "destroy it": put it in the PyListObject free list
140 obj = []
141 obj = None
142
143 # Create a list which should reuse the previously created empty list
144 obj = []
145
146 nframe = tracemalloc.get_traceback_limit()
147 frames = get_frames(nframe, -3)
148 obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe))
149
150 traceback = tracemalloc.get_object_traceback(obj)
151 self.assertIsNotNone(traceback)
152 self.assertEqual(traceback, obj_traceback)
153
154 def test_set_traceback_limit(self):
155 obj_size = 10
156
157 tracemalloc.stop()
158 self.assertRaises(ValueError, tracemalloc.start, -1)
159
160 tracemalloc.stop()
161 tracemalloc.start(10)
162 obj2, obj2_traceback = allocate_bytes(obj_size)
163 traceback = tracemalloc.get_object_traceback(obj2)
164 self.assertEqual(len(traceback), 10)
165 self.assertEqual(traceback, obj2_traceback)
166
167 tracemalloc.stop()
168 tracemalloc.start(1)
169 obj, obj_traceback = allocate_bytes(obj_size)
170 traceback = tracemalloc.get_object_traceback(obj)
171 self.assertEqual(len(traceback), 1)
172 self.assertEqual(traceback, obj_traceback)
173
174 def find_trace(self, traces, traceback):
175 for trace in traces:
176 if trace[2] == traceback._frames:
177 return trace
178
179 self.fail("trace not found")
180
181 def test_get_traces(self):
182 tracemalloc.clear_traces()
183 obj_size = 12345
184 obj, obj_traceback = allocate_bytes(obj_size)
185
186 traces = tracemalloc._get_traces()
187 trace = self.find_trace(traces, obj_traceback)
188
189 self.assertIsInstance(trace, tuple)
190 domain, size, traceback, length = trace
191 self.assertEqual(size, obj_size)
192 self.assertEqual(traceback, obj_traceback._frames)
193
194 tracemalloc.stop()
195 self.assertEqual(tracemalloc._get_traces(), [])
196
197 def test_get_traces_intern_traceback(self):
198 # dummy wrappers to get more useful and identical frames in the traceback
199 def allocate_bytes2(size):
200 return allocate_bytes(size)
201 def allocate_bytes3(size):
202 return allocate_bytes2(size)
203 def allocate_bytes4(size):
204 return allocate_bytes3(size)
205
206 # Ensure that two identical tracebacks are not duplicated
207 tracemalloc.stop()
208 tracemalloc.start(4)
209 obj_size = 123
210 obj1, obj1_traceback = allocate_bytes4(obj_size)
211 obj2, obj2_traceback = allocate_bytes4(obj_size)
212
213 traces = tracemalloc._get_traces()
214
215 obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
216 obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
217
218 trace1 = self.find_trace(traces, obj1_traceback)
219 trace2 = self.find_trace(traces, obj2_traceback)
220 domain1, size1, traceback1, length1 = trace1
221 domain2, size2, traceback2, length2 = trace2
222 self.assertIs(traceback2, traceback1)
223
224 def test_get_traced_memory(self):
225 # Python allocates some internals objects, so the test must tolerate
226 # a small difference between the expected size and the real usage
227 max_error = 2048
228
229 # allocate one object
230 obj_size = 1024 * 1024
231 tracemalloc.clear_traces()
232 obj, obj_traceback = allocate_bytes(obj_size)
233 size, peak_size = tracemalloc.get_traced_memory()
234 self.assertGreaterEqual(size, obj_size)
235 self.assertGreaterEqual(peak_size, size)
236
237 self.assertLessEqual(size - obj_size, max_error)
238 self.assertLessEqual(peak_size - size, max_error)
239
240 # destroy the object
241 obj = None
242 size2, peak_size2 = tracemalloc.get_traced_memory()
243 self.assertLess(size2, size)
244 self.assertGreaterEqual(size - size2, obj_size - max_error)
245 self.assertGreaterEqual(peak_size2, peak_size)
246
247 # clear_traces() must reset traced memory counters
248 tracemalloc.clear_traces()
249 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
250
251 # allocate another object
252 obj, obj_traceback = allocate_bytes(obj_size)
253 size, peak_size = tracemalloc.get_traced_memory()
254 self.assertGreaterEqual(size, obj_size)
255
256 # stop() also resets traced memory counters
257 tracemalloc.stop()
258 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
259
260 def test_clear_traces(self):
261 obj, obj_traceback = allocate_bytes(123)
262 traceback = tracemalloc.get_object_traceback(obj)
263 self.assertIsNotNone(traceback)
264
265 tracemalloc.clear_traces()
266 traceback2 = tracemalloc.get_object_traceback(obj)
267 self.assertIsNone(traceback2)
268
269 def test_reset_peak(self):
270 # Python allocates some internals objects, so the test must tolerate
271 # a small difference between the expected size and the real usage
272 tracemalloc.clear_traces()
273
274 # Example: allocate a large piece of memory, temporarily
275 large_sum = sum(list(range(100000)))
276 size1, peak1 = tracemalloc.get_traced_memory()
277
278 # reset_peak() resets peak to traced memory: peak2 < peak1
279 tracemalloc.reset_peak()
280 size2, peak2 = tracemalloc.get_traced_memory()
281 self.assertGreaterEqual(peak2, size2)
282 self.assertLess(peak2, peak1)
283
284 # check that peak continue to be updated if new memory is allocated:
285 # peak3 > peak2
286 obj_size = 1024 * 1024
287 obj, obj_traceback = allocate_bytes(obj_size)
288 size3, peak3 = tracemalloc.get_traced_memory()
289 self.assertGreaterEqual(peak3, size3)
290 self.assertGreater(peak3, peak2)
291 self.assertGreaterEqual(peak3 - peak2, obj_size)
292
293 def test_is_tracing(self):
294 tracemalloc.stop()
295 self.assertFalse(tracemalloc.is_tracing())
296
297 tracemalloc.start()
298 self.assertTrue(tracemalloc.is_tracing())
299
300 def test_snapshot(self):
301 obj, source = allocate_bytes(123)
302
303 # take a snapshot
304 snapshot = tracemalloc.take_snapshot()
305
306 # This can vary
307 self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
308
309 # write on disk
310 snapshot.dump(os_helper.TESTFN)
311 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
312
313 # load from disk
314 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
315 self.assertEqual(snapshot2.traces, snapshot.traces)
316
317 # tracemalloc must be tracing memory allocations to take a snapshot
318 tracemalloc.stop()
319 with self.assertRaises(RuntimeError) as cm:
320 tracemalloc.take_snapshot()
321 self.assertEqual(str(cm.exception),
322 "the tracemalloc module must be tracing memory "
323 "allocations to take a snapshot")
324
325 def test_snapshot_save_attr(self):
326 # take a snapshot with a new attribute
327 snapshot = tracemalloc.take_snapshot()
328 snapshot.test_attr = "new"
329 snapshot.dump(os_helper.TESTFN)
330 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
331
332 # load() should recreate the attribute
333 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
334 self.assertEqual(snapshot2.test_attr, "new")
335
336 def fork_child(self):
337 if not tracemalloc.is_tracing():
338 return 2
339
340 obj_size = 12345
341 obj, obj_traceback = allocate_bytes(obj_size)
342 traceback = tracemalloc.get_object_traceback(obj)
343 if traceback is None:
344 return 3
345
346 # everything is fine
347 return 0
348
349 @support.requires_fork()
350 def test_fork(self):
351 # check that tracemalloc is still working after fork
352 pid = os.fork()
353 if not pid:
354 # child
355 exitcode = 1
356 try:
357 exitcode = self.fork_child()
358 finally:
359 os._exit(exitcode)
360 else:
361 support.wait_process(pid, exitcode=0)
362
363 def test_no_incomplete_frames(self):
364 tracemalloc.stop()
365 tracemalloc.start(8)
366
367 def f(x):
368 def g():
369 return x
370 return g
371
372 obj = f(0).__closure__[0]
373 traceback = tracemalloc.get_object_traceback(obj)
374 self.assertIn("test_tracemalloc", traceback[-1].filename)
375 self.assertNotIn("test_tracemalloc", traceback[-2].filename)
376
377
378 class ESC[4;38;5;81mTestSnapshot(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
379 maxDiff = 4000
380
381 def test_create_snapshot(self):
382 raw_traces = [(0, 5, (('a.py', 2),), 10)]
383
384 with contextlib.ExitStack() as stack:
385 stack.enter_context(patch.object(tracemalloc, 'is_tracing',
386 return_value=True))
387 stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
388 return_value=5))
389 stack.enter_context(patch.object(tracemalloc, '_get_traces',
390 return_value=raw_traces))
391
392 snapshot = tracemalloc.take_snapshot()
393 self.assertEqual(snapshot.traceback_limit, 5)
394 self.assertEqual(len(snapshot.traces), 1)
395 trace = snapshot.traces[0]
396 self.assertEqual(trace.size, 5)
397 self.assertEqual(trace.traceback.total_nframe, 10)
398 self.assertEqual(len(trace.traceback), 1)
399 self.assertEqual(trace.traceback[0].filename, 'a.py')
400 self.assertEqual(trace.traceback[0].lineno, 2)
401
402 def test_filter_traces(self):
403 snapshot, snapshot2 = create_snapshots()
404 filter1 = tracemalloc.Filter(False, "b.py")
405 filter2 = tracemalloc.Filter(True, "a.py", 2)
406 filter3 = tracemalloc.Filter(True, "a.py", 5)
407
408 original_traces = list(snapshot.traces._traces)
409
410 # exclude b.py
411 snapshot3 = snapshot.filter_traces((filter1,))
412 self.assertEqual(snapshot3.traces._traces, [
413 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
414 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
415 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
416 (1, 2, (('a.py', 5), ('b.py', 4)), 3),
417 (3, 7, (('<unknown>', 0),), 1),
418 ])
419
420 # filter_traces() must not touch the original snapshot
421 self.assertEqual(snapshot.traces._traces, original_traces)
422
423 # only include two lines of a.py
424 snapshot4 = snapshot3.filter_traces((filter2, filter3))
425 self.assertEqual(snapshot4.traces._traces, [
426 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
427 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
428 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
429 (1, 2, (('a.py', 5), ('b.py', 4)), 3),
430 ])
431
432 # No filter: just duplicate the snapshot
433 snapshot5 = snapshot.filter_traces(())
434 self.assertIsNot(snapshot5, snapshot)
435 self.assertIsNot(snapshot5.traces, snapshot.traces)
436 self.assertEqual(snapshot5.traces, snapshot.traces)
437
438 self.assertRaises(TypeError, snapshot.filter_traces, filter1)
439
440 def test_filter_traces_domain(self):
441 snapshot, snapshot2 = create_snapshots()
442 filter1 = tracemalloc.Filter(False, "a.py", domain=1)
443 filter2 = tracemalloc.Filter(True, "a.py", domain=1)
444
445 original_traces = list(snapshot.traces._traces)
446
447 # exclude a.py of domain 1
448 snapshot3 = snapshot.filter_traces((filter1,))
449 self.assertEqual(snapshot3.traces._traces, [
450 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
451 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
452 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
453 (2, 66, (('b.py', 1),), 1),
454 (3, 7, (('<unknown>', 0),), 1),
455 ])
456
457 # include domain 1
458 snapshot3 = snapshot.filter_traces((filter1,))
459 self.assertEqual(snapshot3.traces._traces, [
460 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
461 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
462 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
463 (2, 66, (('b.py', 1),), 1),
464 (3, 7, (('<unknown>', 0),), 1),
465 ])
466
467 def test_filter_traces_domain_filter(self):
468 snapshot, snapshot2 = create_snapshots()
469 filter1 = tracemalloc.DomainFilter(False, domain=3)
470 filter2 = tracemalloc.DomainFilter(True, domain=3)
471
472 # exclude domain 2
473 snapshot3 = snapshot.filter_traces((filter1,))
474 self.assertEqual(snapshot3.traces._traces, [
475 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
476 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
477 (0, 10, (('a.py', 2), ('b.py', 4)), 3),
478 (1, 2, (('a.py', 5), ('b.py', 4)), 3),
479 (2, 66, (('b.py', 1),), 1),
480 ])
481
482 # include domain 2
483 snapshot3 = snapshot.filter_traces((filter2,))
484 self.assertEqual(snapshot3.traces._traces, [
485 (3, 7, (('<unknown>', 0),), 1),
486 ])
487
488 def test_snapshot_group_by_line(self):
489 snapshot, snapshot2 = create_snapshots()
490 tb_0 = traceback_lineno('<unknown>', 0)
491 tb_a_2 = traceback_lineno('a.py', 2)
492 tb_a_5 = traceback_lineno('a.py', 5)
493 tb_b_1 = traceback_lineno('b.py', 1)
494 tb_c_578 = traceback_lineno('c.py', 578)
495
496 # stats per file and line
497 stats1 = snapshot.statistics('lineno')
498 self.assertEqual(stats1, [
499 tracemalloc.Statistic(tb_b_1, 66, 1),
500 tracemalloc.Statistic(tb_a_2, 30, 3),
501 tracemalloc.Statistic(tb_0, 7, 1),
502 tracemalloc.Statistic(tb_a_5, 2, 1),
503 ])
504
505 # stats per file and line (2)
506 stats2 = snapshot2.statistics('lineno')
507 self.assertEqual(stats2, [
508 tracemalloc.Statistic(tb_a_5, 5002, 2),
509 tracemalloc.Statistic(tb_c_578, 400, 1),
510 tracemalloc.Statistic(tb_a_2, 30, 3),
511 ])
512
513 # stats diff per file and line
514 statistics = snapshot2.compare_to(snapshot, 'lineno')
515 self.assertEqual(statistics, [
516 tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
517 tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
518 tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
519 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
520 tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
521 ])
522
523 def test_snapshot_group_by_file(self):
524 snapshot, snapshot2 = create_snapshots()
525 tb_0 = traceback_filename('<unknown>')
526 tb_a = traceback_filename('a.py')
527 tb_b = traceback_filename('b.py')
528 tb_c = traceback_filename('c.py')
529
530 # stats per file
531 stats1 = snapshot.statistics('filename')
532 self.assertEqual(stats1, [
533 tracemalloc.Statistic(tb_b, 66, 1),
534 tracemalloc.Statistic(tb_a, 32, 4),
535 tracemalloc.Statistic(tb_0, 7, 1),
536 ])
537
538 # stats per file (2)
539 stats2 = snapshot2.statistics('filename')
540 self.assertEqual(stats2, [
541 tracemalloc.Statistic(tb_a, 5032, 5),
542 tracemalloc.Statistic(tb_c, 400, 1),
543 ])
544
545 # stats diff per file
546 diff = snapshot2.compare_to(snapshot, 'filename')
547 self.assertEqual(diff, [
548 tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
549 tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
550 tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
551 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
552 ])
553
554 def test_snapshot_group_by_traceback(self):
555 snapshot, snapshot2 = create_snapshots()
556
557 # stats per file
558 tb1 = traceback(('a.py', 2), ('b.py', 4))
559 tb2 = traceback(('a.py', 5), ('b.py', 4))
560 tb3 = traceback(('b.py', 1))
561 tb4 = traceback(('<unknown>', 0))
562 stats1 = snapshot.statistics('traceback')
563 self.assertEqual(stats1, [
564 tracemalloc.Statistic(tb3, 66, 1),
565 tracemalloc.Statistic(tb1, 30, 3),
566 tracemalloc.Statistic(tb4, 7, 1),
567 tracemalloc.Statistic(tb2, 2, 1),
568 ])
569
570 # stats per file (2)
571 tb5 = traceback(('c.py', 578))
572 stats2 = snapshot2.statistics('traceback')
573 self.assertEqual(stats2, [
574 tracemalloc.Statistic(tb2, 5002, 2),
575 tracemalloc.Statistic(tb5, 400, 1),
576 tracemalloc.Statistic(tb1, 30, 3),
577 ])
578
579 # stats diff per file
580 diff = snapshot2.compare_to(snapshot, 'traceback')
581 self.assertEqual(diff, [
582 tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
583 tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
584 tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
585 tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
586 tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
587 ])
588
589 self.assertRaises(ValueError,
590 snapshot.statistics, 'traceback', cumulative=True)
591
592 def test_snapshot_group_by_cumulative(self):
593 snapshot, snapshot2 = create_snapshots()
594 tb_0 = traceback_filename('<unknown>')
595 tb_a = traceback_filename('a.py')
596 tb_b = traceback_filename('b.py')
597 tb_a_2 = traceback_lineno('a.py', 2)
598 tb_a_5 = traceback_lineno('a.py', 5)
599 tb_b_1 = traceback_lineno('b.py', 1)
600 tb_b_4 = traceback_lineno('b.py', 4)
601
602 # per file
603 stats = snapshot.statistics('filename', True)
604 self.assertEqual(stats, [
605 tracemalloc.Statistic(tb_b, 98, 5),
606 tracemalloc.Statistic(tb_a, 32, 4),
607 tracemalloc.Statistic(tb_0, 7, 1),
608 ])
609
610 # per line
611 stats = snapshot.statistics('lineno', True)
612 self.assertEqual(stats, [
613 tracemalloc.Statistic(tb_b_1, 66, 1),
614 tracemalloc.Statistic(tb_b_4, 32, 4),
615 tracemalloc.Statistic(tb_a_2, 30, 3),
616 tracemalloc.Statistic(tb_0, 7, 1),
617 tracemalloc.Statistic(tb_a_5, 2, 1),
618 ])
619
620 def test_trace_format(self):
621 snapshot, snapshot2 = create_snapshots()
622 trace = snapshot.traces[0]
623 self.assertEqual(str(trace), 'b.py:4: 10 B')
624 traceback = trace.traceback
625 self.assertEqual(str(traceback), 'b.py:4')
626 frame = traceback[0]
627 self.assertEqual(str(frame), 'b.py:4')
628
629 def test_statistic_format(self):
630 snapshot, snapshot2 = create_snapshots()
631 stats = snapshot.statistics('lineno')
632 stat = stats[0]
633 self.assertEqual(str(stat),
634 'b.py:1: size=66 B, count=1, average=66 B')
635
636 def test_statistic_diff_format(self):
637 snapshot, snapshot2 = create_snapshots()
638 stats = snapshot2.compare_to(snapshot, 'lineno')
639 stat = stats[0]
640 self.assertEqual(str(stat),
641 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
642
643 def test_slices(self):
644 snapshot, snapshot2 = create_snapshots()
645 self.assertEqual(snapshot.traces[:2],
646 (snapshot.traces[0], snapshot.traces[1]))
647
648 traceback = snapshot.traces[0].traceback
649 self.assertEqual(traceback[:2],
650 (traceback[0], traceback[1]))
651
652 def test_format_traceback(self):
653 snapshot, snapshot2 = create_snapshots()
654 def getline(filename, lineno):
655 return ' <%s, %s>' % (filename, lineno)
656 with unittest.mock.patch('tracemalloc.linecache.getline',
657 side_effect=getline):
658 tb = snapshot.traces[0].traceback
659 self.assertEqual(tb.format(),
660 [' File "b.py", line 4',
661 ' <b.py, 4>',
662 ' File "a.py", line 2',
663 ' <a.py, 2>'])
664
665 self.assertEqual(tb.format(limit=1),
666 [' File "a.py", line 2',
667 ' <a.py, 2>'])
668
669 self.assertEqual(tb.format(limit=-1),
670 [' File "b.py", line 4',
671 ' <b.py, 4>'])
672
673 self.assertEqual(tb.format(most_recent_first=True),
674 [' File "a.py", line 2',
675 ' <a.py, 2>',
676 ' File "b.py", line 4',
677 ' <b.py, 4>'])
678
679 self.assertEqual(tb.format(limit=1, most_recent_first=True),
680 [' File "a.py", line 2',
681 ' <a.py, 2>'])
682
683 self.assertEqual(tb.format(limit=-1, most_recent_first=True),
684 [' File "b.py", line 4',
685 ' <b.py, 4>'])
686
687
688 class ESC[4;38;5;81mTestFilters(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
689 maxDiff = 2048
690
691 def test_filter_attributes(self):
692 # test default values
693 f = tracemalloc.Filter(True, "abc")
694 self.assertEqual(f.inclusive, True)
695 self.assertEqual(f.filename_pattern, "abc")
696 self.assertIsNone(f.lineno)
697 self.assertEqual(f.all_frames, False)
698
699 # test custom values
700 f = tracemalloc.Filter(False, "test.py", 123, True)
701 self.assertEqual(f.inclusive, False)
702 self.assertEqual(f.filename_pattern, "test.py")
703 self.assertEqual(f.lineno, 123)
704 self.assertEqual(f.all_frames, True)
705
706 # parameters passed by keyword
707 f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
708 self.assertEqual(f.inclusive, False)
709 self.assertEqual(f.filename_pattern, "test.py")
710 self.assertEqual(f.lineno, 123)
711 self.assertEqual(f.all_frames, True)
712
713 # read-only attribute
714 self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
715
716 def test_filter_match(self):
717 # filter without line number
718 f = tracemalloc.Filter(True, "abc")
719 self.assertTrue(f._match_frame("abc", 0))
720 self.assertTrue(f._match_frame("abc", 5))
721 self.assertTrue(f._match_frame("abc", 10))
722 self.assertFalse(f._match_frame("12356", 0))
723 self.assertFalse(f._match_frame("12356", 5))
724 self.assertFalse(f._match_frame("12356", 10))
725
726 f = tracemalloc.Filter(False, "abc")
727 self.assertFalse(f._match_frame("abc", 0))
728 self.assertFalse(f._match_frame("abc", 5))
729 self.assertFalse(f._match_frame("abc", 10))
730 self.assertTrue(f._match_frame("12356", 0))
731 self.assertTrue(f._match_frame("12356", 5))
732 self.assertTrue(f._match_frame("12356", 10))
733
734 # filter with line number > 0
735 f = tracemalloc.Filter(True, "abc", 5)
736 self.assertFalse(f._match_frame("abc", 0))
737 self.assertTrue(f._match_frame("abc", 5))
738 self.assertFalse(f._match_frame("abc", 10))
739 self.assertFalse(f._match_frame("12356", 0))
740 self.assertFalse(f._match_frame("12356", 5))
741 self.assertFalse(f._match_frame("12356", 10))
742
743 f = tracemalloc.Filter(False, "abc", 5)
744 self.assertTrue(f._match_frame("abc", 0))
745 self.assertFalse(f._match_frame("abc", 5))
746 self.assertTrue(f._match_frame("abc", 10))
747 self.assertTrue(f._match_frame("12356", 0))
748 self.assertTrue(f._match_frame("12356", 5))
749 self.assertTrue(f._match_frame("12356", 10))
750
751 # filter with line number 0
752 f = tracemalloc.Filter(True, "abc", 0)
753 self.assertTrue(f._match_frame("abc", 0))
754 self.assertFalse(f._match_frame("abc", 5))
755 self.assertFalse(f._match_frame("abc", 10))
756 self.assertFalse(f._match_frame("12356", 0))
757 self.assertFalse(f._match_frame("12356", 5))
758 self.assertFalse(f._match_frame("12356", 10))
759
760 f = tracemalloc.Filter(False, "abc", 0)
761 self.assertFalse(f._match_frame("abc", 0))
762 self.assertTrue(f._match_frame("abc", 5))
763 self.assertTrue(f._match_frame("abc", 10))
764 self.assertTrue(f._match_frame("12356", 0))
765 self.assertTrue(f._match_frame("12356", 5))
766 self.assertTrue(f._match_frame("12356", 10))
767
768 def test_filter_match_filename(self):
769 def fnmatch(inclusive, filename, pattern):
770 f = tracemalloc.Filter(inclusive, pattern)
771 return f._match_frame(filename, 0)
772
773 self.assertTrue(fnmatch(True, "abc", "abc"))
774 self.assertFalse(fnmatch(True, "12356", "abc"))
775 self.assertFalse(fnmatch(True, "<unknown>", "abc"))
776
777 self.assertFalse(fnmatch(False, "abc", "abc"))
778 self.assertTrue(fnmatch(False, "12356", "abc"))
779 self.assertTrue(fnmatch(False, "<unknown>", "abc"))
780
781 def test_filter_match_filename_joker(self):
782 def fnmatch(filename, pattern):
783 filter = tracemalloc.Filter(True, pattern)
784 return filter._match_frame(filename, 0)
785
786 # empty string
787 self.assertFalse(fnmatch('abc', ''))
788 self.assertFalse(fnmatch('', 'abc'))
789 self.assertTrue(fnmatch('', ''))
790 self.assertTrue(fnmatch('', '*'))
791
792 # no *
793 self.assertTrue(fnmatch('abc', 'abc'))
794 self.assertFalse(fnmatch('abc', 'abcd'))
795 self.assertFalse(fnmatch('abc', 'def'))
796
797 # a*
798 self.assertTrue(fnmatch('abc', 'a*'))
799 self.assertTrue(fnmatch('abc', 'abc*'))
800 self.assertFalse(fnmatch('abc', 'b*'))
801 self.assertFalse(fnmatch('abc', 'abcd*'))
802
803 # a*b
804 self.assertTrue(fnmatch('abc', 'a*c'))
805 self.assertTrue(fnmatch('abcdcx', 'a*cx'))
806 self.assertFalse(fnmatch('abb', 'a*c'))
807 self.assertFalse(fnmatch('abcdce', 'a*cx'))
808
809 # a*b*c
810 self.assertTrue(fnmatch('abcde', 'a*c*e'))
811 self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
812 self.assertFalse(fnmatch('abcdd', 'a*c*e'))
813 self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
814
815 # replace .pyc suffix with .py
816 self.assertTrue(fnmatch('a.pyc', 'a.py'))
817 self.assertTrue(fnmatch('a.py', 'a.pyc'))
818
819 if os.name == 'nt':
820 # case insensitive
821 self.assertTrue(fnmatch('aBC', 'ABc'))
822 self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
823
824 self.assertTrue(fnmatch('a.pyc', 'a.PY'))
825 self.assertTrue(fnmatch('a.py', 'a.PYC'))
826 else:
827 # case sensitive
828 self.assertFalse(fnmatch('aBC', 'ABc'))
829 self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
830
831 self.assertFalse(fnmatch('a.pyc', 'a.PY'))
832 self.assertFalse(fnmatch('a.py', 'a.PYC'))
833
834 if os.name == 'nt':
835 # normalize alternate separator "/" to the standard separator "\"
836 self.assertTrue(fnmatch(r'a/b', r'a\b'))
837 self.assertTrue(fnmatch(r'a\b', r'a/b'))
838 self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
839 self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
840 else:
841 # there is no alternate separator
842 self.assertFalse(fnmatch(r'a/b', r'a\b'))
843 self.assertFalse(fnmatch(r'a\b', r'a/b'))
844 self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
845 self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
846
847 # as of 3.5, .pyo is no longer munged to .py
848 self.assertFalse(fnmatch('a.pyo', 'a.py'))
849
850 def test_filter_match_trace(self):
851 t1 = (("a.py", 2), ("b.py", 3))
852 t2 = (("b.py", 4), ("b.py", 5))
853 t3 = (("c.py", 5), ('<unknown>', 0))
854 unknown = (('<unknown>', 0),)
855
856 f = tracemalloc.Filter(True, "b.py", all_frames=True)
857 self.assertTrue(f._match_traceback(t1))
858 self.assertTrue(f._match_traceback(t2))
859 self.assertFalse(f._match_traceback(t3))
860 self.assertFalse(f._match_traceback(unknown))
861
862 f = tracemalloc.Filter(True, "b.py", all_frames=False)
863 self.assertFalse(f._match_traceback(t1))
864 self.assertTrue(f._match_traceback(t2))
865 self.assertFalse(f._match_traceback(t3))
866 self.assertFalse(f._match_traceback(unknown))
867
868 f = tracemalloc.Filter(False, "b.py", all_frames=True)
869 self.assertFalse(f._match_traceback(t1))
870 self.assertFalse(f._match_traceback(t2))
871 self.assertTrue(f._match_traceback(t3))
872 self.assertTrue(f._match_traceback(unknown))
873
874 f = tracemalloc.Filter(False, "b.py", all_frames=False)
875 self.assertTrue(f._match_traceback(t1))
876 self.assertFalse(f._match_traceback(t2))
877 self.assertTrue(f._match_traceback(t3))
878 self.assertTrue(f._match_traceback(unknown))
879
880 f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
881 self.assertTrue(f._match_traceback(t1))
882 self.assertTrue(f._match_traceback(t2))
883 self.assertTrue(f._match_traceback(t3))
884 self.assertFalse(f._match_traceback(unknown))
885
886 f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
887 self.assertFalse(f._match_traceback(t1))
888 self.assertFalse(f._match_traceback(t2))
889 self.assertTrue(f._match_traceback(t3))
890 self.assertTrue(f._match_traceback(unknown))
891
892 f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
893 self.assertTrue(f._match_traceback(t1))
894 self.assertTrue(f._match_traceback(t2))
895 self.assertFalse(f._match_traceback(t3))
896 self.assertFalse(f._match_traceback(unknown))
897
898
899 class ESC[4;38;5;81mTestCommandLine(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
900 def test_env_var_disabled_by_default(self):
901 # not tracing by default
902 code = 'import tracemalloc; print(tracemalloc.is_tracing())'
903 ok, stdout, stderr = assert_python_ok('-c', code)
904 stdout = stdout.rstrip()
905 self.assertEqual(stdout, b'False')
906
907 @unittest.skipIf(interpreter_requires_environment(),
908 'Cannot run -E tests when PYTHON env vars are required.')
909 def test_env_var_ignored_with_E(self):
910 """PYTHON* environment variables must be ignored when -E is present."""
911 code = 'import tracemalloc; print(tracemalloc.is_tracing())'
912 ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
913 stdout = stdout.rstrip()
914 self.assertEqual(stdout, b'False')
915
916 def test_env_var_disabled(self):
917 # tracing at startup
918 code = 'import tracemalloc; print(tracemalloc.is_tracing())'
919 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0')
920 stdout = stdout.rstrip()
921 self.assertEqual(stdout, b'False')
922
923 def test_env_var_enabled_at_startup(self):
924 # tracing at startup
925 code = 'import tracemalloc; print(tracemalloc.is_tracing())'
926 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
927 stdout = stdout.rstrip()
928 self.assertEqual(stdout, b'True')
929
930 def test_env_limit(self):
931 # start and set the number of frames
932 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
933 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
934 stdout = stdout.rstrip()
935 self.assertEqual(stdout, b'10')
936
937 def check_env_var_invalid(self, nframe):
938 with support.SuppressCrashReport():
939 ok, stdout, stderr = assert_python_failure(
940 '-c', 'pass',
941 PYTHONTRACEMALLOC=str(nframe))
942
943 if b'ValueError: the number of frames must be in range' in stderr:
944 return
945 if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr:
946 return
947 self.fail(f"unexpected output: {stderr!a}")
948
949
950 def test_env_var_invalid(self):
951 for nframe in INVALID_NFRAME:
952 with self.subTest(nframe=nframe):
953 self.check_env_var_invalid(nframe)
954
955 def test_sys_xoptions(self):
956 for xoptions, nframe in (
957 ('tracemalloc', 1),
958 ('tracemalloc=1', 1),
959 ('tracemalloc=15', 15),
960 ):
961 with self.subTest(xoptions=xoptions, nframe=nframe):
962 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
963 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
964 stdout = stdout.rstrip()
965 self.assertEqual(stdout, str(nframe).encode('ascii'))
966
967 def check_sys_xoptions_invalid(self, nframe):
968 args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
969 with support.SuppressCrashReport():
970 ok, stdout, stderr = assert_python_failure(*args)
971
972 if b'ValueError: the number of frames must be in range' in stderr:
973 return
974 if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr:
975 return
976 self.fail(f"unexpected output: {stderr!a}")
977
978 def test_sys_xoptions_invalid(self):
979 for nframe in INVALID_NFRAME:
980 with self.subTest(nframe=nframe):
981 self.check_sys_xoptions_invalid(nframe)
982
983 @unittest.skipIf(_testcapi is None, 'need _testcapi')
984 def test_pymem_alloc0(self):
985 # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
986 # does not crash.
987 code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
988 assert_python_ok('-X', 'tracemalloc', '-c', code)
989
990
991 @unittest.skipIf(_testcapi is None, 'need _testcapi')
992 class ESC[4;38;5;81mTestCAPI(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
993 maxDiff = 80 * 20
994
995 def setUp(self):
996 if tracemalloc.is_tracing():
997 self.skipTest("tracemalloc must be stopped before the test")
998
999 self.domain = 5
1000 self.size = 123
1001 self.obj = allocate_bytes(self.size)[0]
1002
1003 # for the type "object", id(obj) is the address of its memory block.
1004 # This type is not tracked by the garbage collector
1005 self.ptr = id(self.obj)
1006
1007 def tearDown(self):
1008 tracemalloc.stop()
1009
1010 def get_traceback(self):
1011 frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
1012 if frames is not None:
1013 return tracemalloc.Traceback(frames)
1014 else:
1015 return None
1016
1017 def track(self, release_gil=False, nframe=1):
1018 frames = get_frames(nframe, 1)
1019 _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
1020 release_gil)
1021 return frames
1022
1023 def untrack(self):
1024 _testcapi.tracemalloc_untrack(self.domain, self.ptr)
1025
1026 def get_traced_memory(self):
1027 # Get the traced size in the domain
1028 snapshot = tracemalloc.take_snapshot()
1029 domain_filter = tracemalloc.DomainFilter(True, self.domain)
1030 snapshot = snapshot.filter_traces([domain_filter])
1031 return sum(trace.size for trace in snapshot.traces)
1032
1033 def check_track(self, release_gil):
1034 nframe = 5
1035 tracemalloc.start(nframe)
1036
1037 size = tracemalloc.get_traced_memory()[0]
1038
1039 frames = self.track(release_gil, nframe)
1040 self.assertEqual(self.get_traceback(),
1041 tracemalloc.Traceback(frames))
1042
1043 self.assertEqual(self.get_traced_memory(), self.size)
1044
1045 def test_track(self):
1046 self.check_track(False)
1047
1048 def test_track_without_gil(self):
1049 # check that calling _PyTraceMalloc_Track() without holding the GIL
1050 # works too
1051 self.check_track(True)
1052
1053 def test_track_already_tracked(self):
1054 nframe = 5
1055 tracemalloc.start(nframe)
1056
1057 # track a first time
1058 self.track()
1059
1060 # calling _PyTraceMalloc_Track() must remove the old trace and add
1061 # a new trace with the new traceback
1062 frames = self.track(nframe=nframe)
1063 self.assertEqual(self.get_traceback(),
1064 tracemalloc.Traceback(frames))
1065
1066 def test_untrack(self):
1067 tracemalloc.start()
1068
1069 self.track()
1070 self.assertIsNotNone(self.get_traceback())
1071 self.assertEqual(self.get_traced_memory(), self.size)
1072
1073 # untrack must remove the trace
1074 self.untrack()
1075 self.assertIsNone(self.get_traceback())
1076 self.assertEqual(self.get_traced_memory(), 0)
1077
1078 # calling _PyTraceMalloc_Untrack() multiple times must not crash
1079 self.untrack()
1080 self.untrack()
1081
1082 def test_stop_track(self):
1083 tracemalloc.start()
1084 tracemalloc.stop()
1085
1086 with self.assertRaises(RuntimeError):
1087 self.track()
1088 self.assertIsNone(self.get_traceback())
1089
1090 def test_stop_untrack(self):
1091 tracemalloc.start()
1092 self.track()
1093
1094 tracemalloc.stop()
1095 with self.assertRaises(RuntimeError):
1096 self.untrack()
1097
1098
1099 if __name__ == "__main__":
1100 unittest.main()