python (3.12.0)
1 import contextlib
2 import os
3 import threading
4 from textwrap import dedent
5 import unittest
6 import time
7
8 from test import support
9 from test.support import import_helper
10 from test.support import threading_helper
11 _interpreters = import_helper.import_module('_xxsubinterpreters')
12 _channels = import_helper.import_module('_xxinterpchannels')
13 from test.support import interpreters
14
15
16 def _captured_script(script):
17 r, w = os.pipe()
18 indented = script.replace('\n', '\n ')
19 wrapped = dedent(f"""
20 import contextlib
21 with open({w}, 'w', encoding='utf-8') as spipe:
22 with contextlib.redirect_stdout(spipe):
23 {indented}
24 """)
25 return wrapped, open(r, encoding='utf-8')
26
27
28 def clean_up_interpreters():
29 for interp in interpreters.list_all():
30 if interp.id == 0: # main
31 continue
32 try:
33 interp.close()
34 except RuntimeError:
35 pass # already destroyed
36
37
38 def _run_output(interp, request, channels=None):
39 script, rpipe = _captured_script(request)
40 with rpipe:
41 interp.run(script, channels=channels)
42 return rpipe.read()
43
44
45 @contextlib.contextmanager
46 def _running(interp):
47 r, w = os.pipe()
48 def run():
49 interp.run(dedent(f"""
50 # wait for "signal"
51 with open({r}) as rpipe:
52 rpipe.read()
53 """))
54
55 t = threading.Thread(target=run)
56 t.start()
57
58 yield
59
60 with open(w, 'w') as spipe:
61 spipe.write('done')
62 t.join()
63
64
65 class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
66
67 def tearDown(self):
68 clean_up_interpreters()
69
70
71 class ESC[4;38;5;81mCreateTests(ESC[4;38;5;149mTestBase):
72
73 def test_in_main(self):
74 interp = interpreters.create()
75 self.assertIsInstance(interp, interpreters.Interpreter)
76 self.assertIn(interp, interpreters.list_all())
77
78 def test_in_thread(self):
79 lock = threading.Lock()
80 interp = None
81 def f():
82 nonlocal interp
83 interp = interpreters.create()
84 lock.acquire()
85 lock.release()
86 t = threading.Thread(target=f)
87 with lock:
88 t.start()
89 t.join()
90 self.assertIn(interp, interpreters.list_all())
91
92 def test_in_subinterpreter(self):
93 main, = interpreters.list_all()
94 interp = interpreters.create()
95 out = _run_output(interp, dedent("""
96 from test.support import interpreters
97 interp = interpreters.create()
98 print(interp.id)
99 """))
100 interp2 = interpreters.Interpreter(int(out))
101 self.assertEqual(interpreters.list_all(), [main, interp, interp2])
102
103 def test_after_destroy_all(self):
104 before = set(interpreters.list_all())
105 # Create 3 subinterpreters.
106 interp_lst = []
107 for _ in range(3):
108 interps = interpreters.create()
109 interp_lst.append(interps)
110 # Now destroy them.
111 for interp in interp_lst:
112 interp.close()
113 # Finally, create another.
114 interp = interpreters.create()
115 self.assertEqual(set(interpreters.list_all()), before | {interp})
116
117 def test_after_destroy_some(self):
118 before = set(interpreters.list_all())
119 # Create 3 subinterpreters.
120 interp1 = interpreters.create()
121 interp2 = interpreters.create()
122 interp3 = interpreters.create()
123 # Now destroy 2 of them.
124 interp1.close()
125 interp2.close()
126 # Finally, create another.
127 interp = interpreters.create()
128 self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
129
130
131 class ESC[4;38;5;81mGetCurrentTests(ESC[4;38;5;149mTestBase):
132
133 def test_main(self):
134 main = interpreters.get_main()
135 current = interpreters.get_current()
136 self.assertEqual(current, main)
137
138 def test_subinterpreter(self):
139 main = _interpreters.get_main()
140 interp = interpreters.create()
141 out = _run_output(interp, dedent("""
142 from test.support import interpreters
143 cur = interpreters.get_current()
144 print(cur.id)
145 """))
146 current = interpreters.Interpreter(int(out))
147 self.assertNotEqual(current, main)
148
149
150 class ESC[4;38;5;81mListAllTests(ESC[4;38;5;149mTestBase):
151
152 def test_initial(self):
153 interps = interpreters.list_all()
154 self.assertEqual(1, len(interps))
155
156 def test_after_creating(self):
157 main = interpreters.get_current()
158 first = interpreters.create()
159 second = interpreters.create()
160
161 ids = []
162 for interp in interpreters.list_all():
163 ids.append(interp.id)
164
165 self.assertEqual(ids, [main.id, first.id, second.id])
166
167 def test_after_destroying(self):
168 main = interpreters.get_current()
169 first = interpreters.create()
170 second = interpreters.create()
171 first.close()
172
173 ids = []
174 for interp in interpreters.list_all():
175 ids.append(interp.id)
176
177 self.assertEqual(ids, [main.id, second.id])
178
179
180 class ESC[4;38;5;81mTestInterpreterAttrs(ESC[4;38;5;149mTestBase):
181
182 def test_id_type(self):
183 main = interpreters.get_main()
184 current = interpreters.get_current()
185 interp = interpreters.create()
186 self.assertIsInstance(main.id, _interpreters.InterpreterID)
187 self.assertIsInstance(current.id, _interpreters.InterpreterID)
188 self.assertIsInstance(interp.id, _interpreters.InterpreterID)
189
190 def test_main_id(self):
191 main = interpreters.get_main()
192 self.assertEqual(main.id, 0)
193
194 def test_custom_id(self):
195 interp = interpreters.Interpreter(1)
196 self.assertEqual(interp.id, 1)
197
198 with self.assertRaises(TypeError):
199 interpreters.Interpreter('1')
200
201 def test_id_readonly(self):
202 interp = interpreters.Interpreter(1)
203 with self.assertRaises(AttributeError):
204 interp.id = 2
205
206 @unittest.skip('not ready yet (see bpo-32604)')
207 def test_main_isolated(self):
208 main = interpreters.get_main()
209 self.assertFalse(main.isolated)
210
211 @unittest.skip('not ready yet (see bpo-32604)')
212 def test_subinterpreter_isolated_default(self):
213 interp = interpreters.create()
214 self.assertFalse(interp.isolated)
215
216 def test_subinterpreter_isolated_explicit(self):
217 interp1 = interpreters.create(isolated=True)
218 interp2 = interpreters.create(isolated=False)
219 self.assertTrue(interp1.isolated)
220 self.assertFalse(interp2.isolated)
221
222 @unittest.skip('not ready yet (see bpo-32604)')
223 def test_custom_isolated_default(self):
224 interp = interpreters.Interpreter(1)
225 self.assertFalse(interp.isolated)
226
227 def test_custom_isolated_explicit(self):
228 interp1 = interpreters.Interpreter(1, isolated=True)
229 interp2 = interpreters.Interpreter(1, isolated=False)
230 self.assertTrue(interp1.isolated)
231 self.assertFalse(interp2.isolated)
232
233 def test_isolated_readonly(self):
234 interp = interpreters.Interpreter(1)
235 with self.assertRaises(AttributeError):
236 interp.isolated = True
237
238 def test_equality(self):
239 interp1 = interpreters.create()
240 interp2 = interpreters.create()
241 self.assertEqual(interp1, interp1)
242 self.assertNotEqual(interp1, interp2)
243
244
245 class ESC[4;38;5;81mTestInterpreterIsRunning(ESC[4;38;5;149mTestBase):
246
247 def test_main(self):
248 main = interpreters.get_main()
249 self.assertTrue(main.is_running())
250
251 @unittest.skip('Fails on FreeBSD')
252 def test_subinterpreter(self):
253 interp = interpreters.create()
254 self.assertFalse(interp.is_running())
255
256 with _running(interp):
257 self.assertTrue(interp.is_running())
258 self.assertFalse(interp.is_running())
259
260 def test_from_subinterpreter(self):
261 interp = interpreters.create()
262 out = _run_output(interp, dedent(f"""
263 import _xxsubinterpreters as _interpreters
264 if _interpreters.is_running({interp.id}):
265 print(True)
266 else:
267 print(False)
268 """))
269 self.assertEqual(out.strip(), 'True')
270
271 def test_already_destroyed(self):
272 interp = interpreters.create()
273 interp.close()
274 with self.assertRaises(RuntimeError):
275 interp.is_running()
276
277 def test_does_not_exist(self):
278 interp = interpreters.Interpreter(1_000_000)
279 with self.assertRaises(RuntimeError):
280 interp.is_running()
281
282 def test_bad_id(self):
283 interp = interpreters.Interpreter(-1)
284 with self.assertRaises(ValueError):
285 interp.is_running()
286
287
288 class ESC[4;38;5;81mTestInterpreterClose(ESC[4;38;5;149mTestBase):
289
290 def test_basic(self):
291 main = interpreters.get_main()
292 interp1 = interpreters.create()
293 interp2 = interpreters.create()
294 interp3 = interpreters.create()
295 self.assertEqual(set(interpreters.list_all()),
296 {main, interp1, interp2, interp3})
297 interp2.close()
298 self.assertEqual(set(interpreters.list_all()),
299 {main, interp1, interp3})
300
301 def test_all(self):
302 before = set(interpreters.list_all())
303 interps = set()
304 for _ in range(3):
305 interp = interpreters.create()
306 interps.add(interp)
307 self.assertEqual(set(interpreters.list_all()), before | interps)
308 for interp in interps:
309 interp.close()
310 self.assertEqual(set(interpreters.list_all()), before)
311
312 def test_main(self):
313 main, = interpreters.list_all()
314 with self.assertRaises(RuntimeError):
315 main.close()
316
317 def f():
318 with self.assertRaises(RuntimeError):
319 main.close()
320
321 t = threading.Thread(target=f)
322 t.start()
323 t.join()
324
325 def test_already_destroyed(self):
326 interp = interpreters.create()
327 interp.close()
328 with self.assertRaises(RuntimeError):
329 interp.close()
330
331 def test_does_not_exist(self):
332 interp = interpreters.Interpreter(1_000_000)
333 with self.assertRaises(RuntimeError):
334 interp.close()
335
336 def test_bad_id(self):
337 interp = interpreters.Interpreter(-1)
338 with self.assertRaises(ValueError):
339 interp.close()
340
341 def test_from_current(self):
342 main, = interpreters.list_all()
343 interp = interpreters.create()
344 out = _run_output(interp, dedent(f"""
345 from test.support import interpreters
346 interp = interpreters.Interpreter({int(interp.id)})
347 try:
348 interp.close()
349 except RuntimeError:
350 print('failed')
351 """))
352 self.assertEqual(out.strip(), 'failed')
353 self.assertEqual(set(interpreters.list_all()), {main, interp})
354
355 def test_from_sibling(self):
356 main, = interpreters.list_all()
357 interp1 = interpreters.create()
358 interp2 = interpreters.create()
359 self.assertEqual(set(interpreters.list_all()),
360 {main, interp1, interp2})
361 interp1.run(dedent(f"""
362 from test.support import interpreters
363 interp2 = interpreters.Interpreter(int({interp2.id}))
364 interp2.close()
365 interp3 = interpreters.create()
366 interp3.close()
367 """))
368 self.assertEqual(set(interpreters.list_all()), {main, interp1})
369
370 def test_from_other_thread(self):
371 interp = interpreters.create()
372 def f():
373 interp.close()
374
375 t = threading.Thread(target=f)
376 t.start()
377 t.join()
378
379 @unittest.skip('Fails on FreeBSD')
380 def test_still_running(self):
381 main, = interpreters.list_all()
382 interp = interpreters.create()
383 with _running(interp):
384 with self.assertRaises(RuntimeError):
385 interp.close()
386 self.assertTrue(interp.is_running())
387
388
389 class ESC[4;38;5;81mTestInterpreterRun(ESC[4;38;5;149mTestBase):
390
391 def test_success(self):
392 interp = interpreters.create()
393 script, file = _captured_script('print("it worked!", end="")')
394 with file:
395 interp.run(script)
396 out = file.read()
397
398 self.assertEqual(out, 'it worked!')
399
400 def test_in_thread(self):
401 interp = interpreters.create()
402 script, file = _captured_script('print("it worked!", end="")')
403 with file:
404 def f():
405 interp.run(script)
406
407 t = threading.Thread(target=f)
408 t.start()
409 t.join()
410 out = file.read()
411
412 self.assertEqual(out, 'it worked!')
413
414 @support.requires_fork()
415 def test_fork(self):
416 interp = interpreters.create()
417 import tempfile
418 with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
419 file.write('')
420 file.flush()
421
422 expected = 'spam spam spam spam spam'
423 script = dedent(f"""
424 import os
425 try:
426 os.fork()
427 except RuntimeError:
428 with open('{file.name}', 'w', encoding='utf-8') as out:
429 out.write('{expected}')
430 """)
431 interp.run(script)
432
433 file.seek(0)
434 content = file.read()
435 self.assertEqual(content, expected)
436
437 @unittest.skip('Fails on FreeBSD')
438 def test_already_running(self):
439 interp = interpreters.create()
440 with _running(interp):
441 with self.assertRaises(RuntimeError):
442 interp.run('print("spam")')
443
444 def test_does_not_exist(self):
445 interp = interpreters.Interpreter(1_000_000)
446 with self.assertRaises(RuntimeError):
447 interp.run('print("spam")')
448
449 def test_bad_id(self):
450 interp = interpreters.Interpreter(-1)
451 with self.assertRaises(ValueError):
452 interp.run('print("spam")')
453
454 def test_bad_script(self):
455 interp = interpreters.create()
456 with self.assertRaises(TypeError):
457 interp.run(10)
458
459 def test_bytes_for_script(self):
460 interp = interpreters.create()
461 with self.assertRaises(TypeError):
462 interp.run(b'print("spam")')
463
464 # test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
465
466
467 @unittest.skip('these are crashing, likely just due just to _xxsubinterpreters (see gh-105699)')
468 class ESC[4;38;5;81mStressTests(ESC[4;38;5;149mTestBase):
469
470 # In these tests we generally want a lot of interpreters,
471 # but not so many that any test takes too long.
472
473 @support.requires_resource('cpu')
474 def test_create_many_sequential(self):
475 alive = []
476 for _ in range(100):
477 interp = interpreters.create()
478 alive.append(interp)
479
480 @support.requires_resource('cpu')
481 def test_create_many_threaded(self):
482 alive = []
483 def task():
484 interp = interpreters.create()
485 alive.append(interp)
486 threads = (threading.Thread(target=task) for _ in range(200))
487 with threading_helper.start_threads(threads):
488 pass
489
490
491 class ESC[4;38;5;81mTestIsShareable(ESC[4;38;5;149mTestBase):
492
493 def test_default_shareables(self):
494 shareables = [
495 # singletons
496 None,
497 # builtin objects
498 b'spam',
499 'spam',
500 10,
501 -10,
502 ]
503 for obj in shareables:
504 with self.subTest(obj):
505 shareable = interpreters.is_shareable(obj)
506 self.assertTrue(shareable)
507
508 def test_not_shareable(self):
509 class ESC[4;38;5;81mCheese:
510 def __init__(self, name):
511 self.name = name
512 def __str__(self):
513 return self.name
514
515 class ESC[4;38;5;81mSubBytes(ESC[4;38;5;149mbytes):
516 """A subclass of a shareable type."""
517
518 not_shareables = [
519 # singletons
520 True,
521 False,
522 NotImplemented,
523 ...,
524 # builtin types and objects
525 type,
526 object,
527 object(),
528 Exception(),
529 100.0,
530 # user-defined types and objects
531 Cheese,
532 Cheese('Wensleydale'),
533 SubBytes(b'spam'),
534 ]
535 for obj in not_shareables:
536 with self.subTest(repr(obj)):
537 self.assertFalse(
538 interpreters.is_shareable(obj))
539
540
541 class ESC[4;38;5;81mTestChannels(ESC[4;38;5;149mTestBase):
542
543 def test_create(self):
544 r, s = interpreters.create_channel()
545 self.assertIsInstance(r, interpreters.RecvChannel)
546 self.assertIsInstance(s, interpreters.SendChannel)
547
548 def test_list_all(self):
549 self.assertEqual(interpreters.list_all_channels(), [])
550 created = set()
551 for _ in range(3):
552 ch = interpreters.create_channel()
553 created.add(ch)
554 after = set(interpreters.list_all_channels())
555 self.assertEqual(after, created)
556
557
558 class ESC[4;38;5;81mTestRecvChannelAttrs(ESC[4;38;5;149mTestBase):
559
560 def test_id_type(self):
561 rch, _ = interpreters.create_channel()
562 self.assertIsInstance(rch.id, _channels.ChannelID)
563
564 def test_custom_id(self):
565 rch = interpreters.RecvChannel(1)
566 self.assertEqual(rch.id, 1)
567
568 with self.assertRaises(TypeError):
569 interpreters.RecvChannel('1')
570
571 def test_id_readonly(self):
572 rch = interpreters.RecvChannel(1)
573 with self.assertRaises(AttributeError):
574 rch.id = 2
575
576 def test_equality(self):
577 ch1, _ = interpreters.create_channel()
578 ch2, _ = interpreters.create_channel()
579 self.assertEqual(ch1, ch1)
580 self.assertNotEqual(ch1, ch2)
581
582
583 class ESC[4;38;5;81mTestSendChannelAttrs(ESC[4;38;5;149mTestBase):
584
585 def test_id_type(self):
586 _, sch = interpreters.create_channel()
587 self.assertIsInstance(sch.id, _channels.ChannelID)
588
589 def test_custom_id(self):
590 sch = interpreters.SendChannel(1)
591 self.assertEqual(sch.id, 1)
592
593 with self.assertRaises(TypeError):
594 interpreters.SendChannel('1')
595
596 def test_id_readonly(self):
597 sch = interpreters.SendChannel(1)
598 with self.assertRaises(AttributeError):
599 sch.id = 2
600
601 def test_equality(self):
602 _, ch1 = interpreters.create_channel()
603 _, ch2 = interpreters.create_channel()
604 self.assertEqual(ch1, ch1)
605 self.assertNotEqual(ch1, ch2)
606
607
608 class ESC[4;38;5;81mTestSendRecv(ESC[4;38;5;149mTestBase):
609
610 def test_send_recv_main(self):
611 r, s = interpreters.create_channel()
612 orig = b'spam'
613 s.send_nowait(orig)
614 obj = r.recv()
615
616 self.assertEqual(obj, orig)
617 self.assertIsNot(obj, orig)
618
619 def test_send_recv_same_interpreter(self):
620 interp = interpreters.create()
621 interp.run(dedent("""
622 from test.support import interpreters
623 r, s = interpreters.create_channel()
624 orig = b'spam'
625 s.send_nowait(orig)
626 obj = r.recv()
627 assert obj == orig, 'expected: obj == orig'
628 assert obj is not orig, 'expected: obj is not orig'
629 """))
630
631 @unittest.skip('broken (see BPO-...)')
632 def test_send_recv_different_interpreters(self):
633 r1, s1 = interpreters.create_channel()
634 r2, s2 = interpreters.create_channel()
635 orig1 = b'spam'
636 s1.send_nowait(orig1)
637 out = _run_output(
638 interpreters.create(),
639 dedent(f"""
640 obj1 = r.recv()
641 assert obj1 == b'spam', 'expected: obj1 == orig1'
642 # When going to another interpreter we get a copy.
643 assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
644 orig2 = b'eggs'
645 print(id(orig2))
646 s.send_nowait(orig2)
647 """),
648 channels=dict(r=r1, s=s2),
649 )
650 obj2 = r2.recv()
651
652 self.assertEqual(obj2, b'eggs')
653 self.assertNotEqual(id(obj2), int(out))
654
655 def test_send_recv_different_threads(self):
656 r, s = interpreters.create_channel()
657
658 def f():
659 while True:
660 try:
661 obj = r.recv()
662 break
663 except interpreters.ChannelEmptyError:
664 time.sleep(0.1)
665 s.send(obj)
666 t = threading.Thread(target=f)
667 t.start()
668
669 orig = b'spam'
670 s.send(orig)
671 t.join()
672 obj = r.recv()
673
674 self.assertEqual(obj, orig)
675 self.assertIsNot(obj, orig)
676
677 def test_send_recv_nowait_main(self):
678 r, s = interpreters.create_channel()
679 orig = b'spam'
680 s.send_nowait(orig)
681 obj = r.recv_nowait()
682
683 self.assertEqual(obj, orig)
684 self.assertIsNot(obj, orig)
685
686 def test_send_recv_nowait_main_with_default(self):
687 r, _ = interpreters.create_channel()
688 obj = r.recv_nowait(None)
689
690 self.assertIsNone(obj)
691
692 def test_send_recv_nowait_same_interpreter(self):
693 interp = interpreters.create()
694 interp.run(dedent("""
695 from test.support import interpreters
696 r, s = interpreters.create_channel()
697 orig = b'spam'
698 s.send_nowait(orig)
699 obj = r.recv_nowait()
700 assert obj == orig, 'expected: obj == orig'
701 # When going back to the same interpreter we get the same object.
702 assert obj is not orig, 'expected: obj is not orig'
703 """))
704
705 @unittest.skip('broken (see BPO-...)')
706 def test_send_recv_nowait_different_interpreters(self):
707 r1, s1 = interpreters.create_channel()
708 r2, s2 = interpreters.create_channel()
709 orig1 = b'spam'
710 s1.send_nowait(orig1)
711 out = _run_output(
712 interpreters.create(),
713 dedent(f"""
714 obj1 = r.recv_nowait()
715 assert obj1 == b'spam', 'expected: obj1 == orig1'
716 # When going to another interpreter we get a copy.
717 assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
718 orig2 = b'eggs'
719 print(id(orig2))
720 s.send_nowait(orig2)
721 """),
722 channels=dict(r=r1, s=s2),
723 )
724 obj2 = r2.recv_nowait()
725
726 self.assertEqual(obj2, b'eggs')
727 self.assertNotEqual(id(obj2), int(out))
728
729 def test_recv_channel_does_not_exist(self):
730 ch = interpreters.RecvChannel(1_000_000)
731 with self.assertRaises(interpreters.ChannelNotFoundError):
732 ch.recv()
733
734 def test_send_channel_does_not_exist(self):
735 ch = interpreters.SendChannel(1_000_000)
736 with self.assertRaises(interpreters.ChannelNotFoundError):
737 ch.send(b'spam')
738
739 def test_recv_nowait_channel_does_not_exist(self):
740 ch = interpreters.RecvChannel(1_000_000)
741 with self.assertRaises(interpreters.ChannelNotFoundError):
742 ch.recv_nowait()
743
744 def test_send_nowait_channel_does_not_exist(self):
745 ch = interpreters.SendChannel(1_000_000)
746 with self.assertRaises(interpreters.ChannelNotFoundError):
747 ch.send_nowait(b'spam')
748
749 def test_recv_nowait_empty(self):
750 ch, _ = interpreters.create_channel()
751 with self.assertRaises(interpreters.ChannelEmptyError):
752 ch.recv_nowait()
753
754 def test_recv_nowait_default(self):
755 default = object()
756 rch, sch = interpreters.create_channel()
757 obj1 = rch.recv_nowait(default)
758 sch.send_nowait(None)
759 sch.send_nowait(1)
760 sch.send_nowait(b'spam')
761 sch.send_nowait(b'eggs')
762 obj2 = rch.recv_nowait(default)
763 obj3 = rch.recv_nowait(default)
764 obj4 = rch.recv_nowait()
765 obj5 = rch.recv_nowait(default)
766 obj6 = rch.recv_nowait(default)
767
768 self.assertIs(obj1, default)
769 self.assertIs(obj2, None)
770 self.assertEqual(obj3, 1)
771 self.assertEqual(obj4, b'spam')
772 self.assertEqual(obj5, b'eggs')
773 self.assertIs(obj6, default)