1 import contextlib
2 import itertools
3 import os
4 import pickle
5 import sys
6 from textwrap import dedent
7 import threading
8 import unittest
9
10 import _testcapi
11 from test import support
12 from test.support import import_helper
13 from test.support import script_helper
14
15
16 interpreters = import_helper.import_module('_xxsubinterpreters')
17
18
19 ##################################
20 # helpers
21
22 def _captured_script(script):
23 r, w = os.pipe()
24 indented = script.replace('\n', '\n ')
25 wrapped = dedent(f"""
26 import contextlib
27 with open({w}, 'w', encoding="utf-8") as spipe:
28 with contextlib.redirect_stdout(spipe):
29 {indented}
30 """)
31 return wrapped, open(r, encoding="utf-8")
32
33
34 def _run_output(interp, request, shared=None):
35 script, rpipe = _captured_script(request)
36 with rpipe:
37 interpreters.run_string(interp, script, shared)
38 return rpipe.read()
39
40
41 def _wait_for_interp_to_run(interp, timeout=None):
42 # bpo-37224: Running this test file in multiprocesses will fail randomly.
43 # The failure reason is that the thread can't acquire the cpu to
44 # run subinterpreter eariler than the main thread in multiprocess.
45 if timeout is None:
46 timeout = support.SHORT_TIMEOUT
47 for _ in support.sleeping_retry(timeout, error=False):
48 if interpreters.is_running(interp):
49 break
50 else:
51 raise RuntimeError('interp is not running')
52
53
54 @contextlib.contextmanager
55 def _running(interp):
56 r, w = os.pipe()
57 def run():
58 interpreters.run_string(interp, dedent(f"""
59 # wait for "signal"
60 with open({r}, encoding="utf-8") as rpipe:
61 rpipe.read()
62 """))
63
64 t = threading.Thread(target=run)
65 t.start()
66 _wait_for_interp_to_run(interp)
67
68 yield
69
70 with open(w, 'w', encoding="utf-8") as spipe:
71 spipe.write('done')
72 t.join()
73
74
75 def clean_up_interpreters():
76 for id in interpreters.list_all():
77 if id == 0: # main
78 continue
79 try:
80 interpreters.destroy(id)
81 except RuntimeError:
82 pass # already destroyed
83
84
85 class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
86
87 def tearDown(self):
88 clean_up_interpreters()
89
90
91 ##################################
92 # misc. tests
93
94 class ESC[4;38;5;81mIsShareableTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
95
96 def test_default_shareables(self):
97 shareables = [
98 # singletons
99 None,
100 # builtin objects
101 b'spam',
102 'spam',
103 10,
104 -10,
105 ]
106 for obj in shareables:
107 with self.subTest(obj):
108 self.assertTrue(
109 interpreters.is_shareable(obj))
110
111 def test_not_shareable(self):
112 class ESC[4;38;5;81mCheese:
113 def __init__(self, name):
114 self.name = name
115 def __str__(self):
116 return self.name
117
118 class ESC[4;38;5;81mSubBytes(ESC[4;38;5;149mbytes):
119 """A subclass of a shareable type."""
120
121 not_shareables = [
122 # singletons
123 True,
124 False,
125 NotImplemented,
126 ...,
127 # builtin types and objects
128 type,
129 object,
130 object(),
131 Exception(),
132 100.0,
133 # user-defined types and objects
134 Cheese,
135 Cheese('Wensleydale'),
136 SubBytes(b'spam'),
137 ]
138 for obj in not_shareables:
139 with self.subTest(repr(obj)):
140 self.assertFalse(
141 interpreters.is_shareable(obj))
142
143
144 class ESC[4;38;5;81mShareableTypeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
145
146 def _assert_values(self, values):
147 for obj in values:
148 with self.subTest(obj):
149 xid = _testcapi.get_crossinterp_data(obj)
150 got = _testcapi.restore_crossinterp_data(xid)
151
152 self.assertEqual(got, obj)
153 self.assertIs(type(got), type(obj))
154
155 def test_singletons(self):
156 for obj in [None]:
157 with self.subTest(obj):
158 xid = _testcapi.get_crossinterp_data(obj)
159 got = _testcapi.restore_crossinterp_data(xid)
160
161 # XXX What about between interpreters?
162 self.assertIs(got, obj)
163
164 def test_types(self):
165 self._assert_values([
166 b'spam',
167 9999,
168 ])
169
170 def test_bytes(self):
171 self._assert_values(i.to_bytes(2, 'little', signed=True)
172 for i in range(-1, 258))
173
174 def test_strs(self):
175 self._assert_values(['hello world', '你好世界', ''])
176
177 def test_int(self):
178 self._assert_values(itertools.chain(range(-1, 258),
179 [sys.maxsize, -sys.maxsize - 1]))
180
181 def test_non_shareable_int(self):
182 ints = [
183 sys.maxsize + 1,
184 -sys.maxsize - 2,
185 2**1000,
186 ]
187 for i in ints:
188 with self.subTest(i):
189 with self.assertRaises(OverflowError):
190 _testcapi.get_crossinterp_data(i)
191
192
193 class ESC[4;38;5;81mModuleTests(ESC[4;38;5;149mTestBase):
194
195 def test_import_in_interpreter(self):
196 _run_output(
197 interpreters.create(),
198 'import _xxsubinterpreters as _interpreters',
199 )
200
201
202 ##################################
203 # interpreter tests
204
205 class ESC[4;38;5;81mListAllTests(ESC[4;38;5;149mTestBase):
206
207 def test_initial(self):
208 main = interpreters.get_main()
209 ids = interpreters.list_all()
210 self.assertEqual(ids, [main])
211
212 def test_after_creating(self):
213 main = interpreters.get_main()
214 first = interpreters.create()
215 second = interpreters.create()
216 ids = interpreters.list_all()
217 self.assertEqual(ids, [main, first, second])
218
219 def test_after_destroying(self):
220 main = interpreters.get_main()
221 first = interpreters.create()
222 second = interpreters.create()
223 interpreters.destroy(first)
224 ids = interpreters.list_all()
225 self.assertEqual(ids, [main, second])
226
227
228 class ESC[4;38;5;81mGetCurrentTests(ESC[4;38;5;149mTestBase):
229
230 def test_main(self):
231 main = interpreters.get_main()
232 cur = interpreters.get_current()
233 self.assertEqual(cur, main)
234 self.assertIsInstance(cur, interpreters.InterpreterID)
235
236 def test_subinterpreter(self):
237 main = interpreters.get_main()
238 interp = interpreters.create()
239 out = _run_output(interp, dedent("""
240 import _xxsubinterpreters as _interpreters
241 cur = _interpreters.get_current()
242 print(cur)
243 assert isinstance(cur, _interpreters.InterpreterID)
244 """))
245 cur = int(out.strip())
246 _, expected = interpreters.list_all()
247 self.assertEqual(cur, expected)
248 self.assertNotEqual(cur, main)
249
250
251 class ESC[4;38;5;81mGetMainTests(ESC[4;38;5;149mTestBase):
252
253 def test_from_main(self):
254 [expected] = interpreters.list_all()
255 main = interpreters.get_main()
256 self.assertEqual(main, expected)
257 self.assertIsInstance(main, interpreters.InterpreterID)
258
259 def test_from_subinterpreter(self):
260 [expected] = interpreters.list_all()
261 interp = interpreters.create()
262 out = _run_output(interp, dedent("""
263 import _xxsubinterpreters as _interpreters
264 main = _interpreters.get_main()
265 print(main)
266 assert isinstance(main, _interpreters.InterpreterID)
267 """))
268 main = int(out.strip())
269 self.assertEqual(main, expected)
270
271
272 class ESC[4;38;5;81mIsRunningTests(ESC[4;38;5;149mTestBase):
273
274 def test_main(self):
275 main = interpreters.get_main()
276 self.assertTrue(interpreters.is_running(main))
277
278 @unittest.skip('Fails on FreeBSD')
279 def test_subinterpreter(self):
280 interp = interpreters.create()
281 self.assertFalse(interpreters.is_running(interp))
282
283 with _running(interp):
284 self.assertTrue(interpreters.is_running(interp))
285 self.assertFalse(interpreters.is_running(interp))
286
287 def test_from_subinterpreter(self):
288 interp = interpreters.create()
289 out = _run_output(interp, dedent(f"""
290 import _xxsubinterpreters as _interpreters
291 if _interpreters.is_running({interp}):
292 print(True)
293 else:
294 print(False)
295 """))
296 self.assertEqual(out.strip(), 'True')
297
298 def test_already_destroyed(self):
299 interp = interpreters.create()
300 interpreters.destroy(interp)
301 with self.assertRaises(RuntimeError):
302 interpreters.is_running(interp)
303
304 def test_does_not_exist(self):
305 with self.assertRaises(RuntimeError):
306 interpreters.is_running(1_000_000)
307
308 def test_bad_id(self):
309 with self.assertRaises(ValueError):
310 interpreters.is_running(-1)
311
312
313 class ESC[4;38;5;81mInterpreterIDTests(ESC[4;38;5;149mTestBase):
314
315 def test_with_int(self):
316 id = interpreters.InterpreterID(10, force=True)
317
318 self.assertEqual(int(id), 10)
319
320 def test_coerce_id(self):
321 class ESC[4;38;5;81mInt(ESC[4;38;5;149mstr):
322 def __index__(self):
323 return 10
324
325 id = interpreters.InterpreterID(Int(), force=True)
326 self.assertEqual(int(id), 10)
327
328 def test_bad_id(self):
329 self.assertRaises(TypeError, interpreters.InterpreterID, object())
330 self.assertRaises(TypeError, interpreters.InterpreterID, 10.0)
331 self.assertRaises(TypeError, interpreters.InterpreterID, '10')
332 self.assertRaises(TypeError, interpreters.InterpreterID, b'10')
333 self.assertRaises(ValueError, interpreters.InterpreterID, -1)
334 self.assertRaises(OverflowError, interpreters.InterpreterID, 2**64)
335
336 def test_does_not_exist(self):
337 id = interpreters.create()
338 with self.assertRaises(RuntimeError):
339 interpreters.InterpreterID(int(id) + 1) # unforced
340
341 def test_str(self):
342 id = interpreters.InterpreterID(10, force=True)
343 self.assertEqual(str(id), '10')
344
345 def test_repr(self):
346 id = interpreters.InterpreterID(10, force=True)
347 self.assertEqual(repr(id), 'InterpreterID(10)')
348
349 def test_equality(self):
350 id1 = interpreters.create()
351 id2 = interpreters.InterpreterID(int(id1))
352 id3 = interpreters.create()
353
354 self.assertTrue(id1 == id1)
355 self.assertTrue(id1 == id2)
356 self.assertTrue(id1 == int(id1))
357 self.assertTrue(int(id1) == id1)
358 self.assertTrue(id1 == float(int(id1)))
359 self.assertTrue(float(int(id1)) == id1)
360 self.assertFalse(id1 == float(int(id1)) + 0.1)
361 self.assertFalse(id1 == str(int(id1)))
362 self.assertFalse(id1 == 2**1000)
363 self.assertFalse(id1 == float('inf'))
364 self.assertFalse(id1 == 'spam')
365 self.assertFalse(id1 == id3)
366
367 self.assertFalse(id1 != id1)
368 self.assertFalse(id1 != id2)
369 self.assertTrue(id1 != id3)
370
371
372 class ESC[4;38;5;81mCreateTests(ESC[4;38;5;149mTestBase):
373
374 def test_in_main(self):
375 id = interpreters.create()
376 self.assertIsInstance(id, interpreters.InterpreterID)
377
378 self.assertIn(id, interpreters.list_all())
379
380 @unittest.skip('enable this test when working on pystate.c')
381 def test_unique_id(self):
382 seen = set()
383 for _ in range(100):
384 id = interpreters.create()
385 interpreters.destroy(id)
386 seen.add(id)
387
388 self.assertEqual(len(seen), 100)
389
390 def test_in_thread(self):
391 lock = threading.Lock()
392 id = None
393 def f():
394 nonlocal id
395 id = interpreters.create()
396 lock.acquire()
397 lock.release()
398
399 t = threading.Thread(target=f)
400 with lock:
401 t.start()
402 t.join()
403 self.assertIn(id, interpreters.list_all())
404
405 def test_in_subinterpreter(self):
406 main, = interpreters.list_all()
407 id1 = interpreters.create()
408 out = _run_output(id1, dedent("""
409 import _xxsubinterpreters as _interpreters
410 id = _interpreters.create()
411 print(id)
412 assert isinstance(id, _interpreters.InterpreterID)
413 """))
414 id2 = int(out.strip())
415
416 self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
417
418 def test_in_threaded_subinterpreter(self):
419 main, = interpreters.list_all()
420 id1 = interpreters.create()
421 id2 = None
422 def f():
423 nonlocal id2
424 out = _run_output(id1, dedent("""
425 import _xxsubinterpreters as _interpreters
426 id = _interpreters.create()
427 print(id)
428 """))
429 id2 = int(out.strip())
430
431 t = threading.Thread(target=f)
432 t.start()
433 t.join()
434
435 self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
436
437 def test_after_destroy_all(self):
438 before = set(interpreters.list_all())
439 # Create 3 subinterpreters.
440 ids = []
441 for _ in range(3):
442 id = interpreters.create()
443 ids.append(id)
444 # Now destroy them.
445 for id in ids:
446 interpreters.destroy(id)
447 # Finally, create another.
448 id = interpreters.create()
449 self.assertEqual(set(interpreters.list_all()), before | {id})
450
451 def test_after_destroy_some(self):
452 before = set(interpreters.list_all())
453 # Create 3 subinterpreters.
454 id1 = interpreters.create()
455 id2 = interpreters.create()
456 id3 = interpreters.create()
457 # Now destroy 2 of them.
458 interpreters.destroy(id1)
459 interpreters.destroy(id3)
460 # Finally, create another.
461 id = interpreters.create()
462 self.assertEqual(set(interpreters.list_all()), before | {id, id2})
463
464
465 class ESC[4;38;5;81mDestroyTests(ESC[4;38;5;149mTestBase):
466
467 def test_one(self):
468 id1 = interpreters.create()
469 id2 = interpreters.create()
470 id3 = interpreters.create()
471 self.assertIn(id2, interpreters.list_all())
472 interpreters.destroy(id2)
473 self.assertNotIn(id2, interpreters.list_all())
474 self.assertIn(id1, interpreters.list_all())
475 self.assertIn(id3, interpreters.list_all())
476
477 def test_all(self):
478 before = set(interpreters.list_all())
479 ids = set()
480 for _ in range(3):
481 id = interpreters.create()
482 ids.add(id)
483 self.assertEqual(set(interpreters.list_all()), before | ids)
484 for id in ids:
485 interpreters.destroy(id)
486 self.assertEqual(set(interpreters.list_all()), before)
487
488 def test_main(self):
489 main, = interpreters.list_all()
490 with self.assertRaises(RuntimeError):
491 interpreters.destroy(main)
492
493 def f():
494 with self.assertRaises(RuntimeError):
495 interpreters.destroy(main)
496
497 t = threading.Thread(target=f)
498 t.start()
499 t.join()
500
501 def test_already_destroyed(self):
502 id = interpreters.create()
503 interpreters.destroy(id)
504 with self.assertRaises(RuntimeError):
505 interpreters.destroy(id)
506
507 def test_does_not_exist(self):
508 with self.assertRaises(RuntimeError):
509 interpreters.destroy(1_000_000)
510
511 def test_bad_id(self):
512 with self.assertRaises(ValueError):
513 interpreters.destroy(-1)
514
515 def test_from_current(self):
516 main, = interpreters.list_all()
517 id = interpreters.create()
518 script = dedent(f"""
519 import _xxsubinterpreters as _interpreters
520 try:
521 _interpreters.destroy({id})
522 except RuntimeError:
523 pass
524 """)
525
526 interpreters.run_string(id, script)
527 self.assertEqual(set(interpreters.list_all()), {main, id})
528
529 def test_from_sibling(self):
530 main, = interpreters.list_all()
531 id1 = interpreters.create()
532 id2 = interpreters.create()
533 script = dedent(f"""
534 import _xxsubinterpreters as _interpreters
535 _interpreters.destroy({id2})
536 """)
537 interpreters.run_string(id1, script)
538
539 self.assertEqual(set(interpreters.list_all()), {main, id1})
540
541 def test_from_other_thread(self):
542 id = interpreters.create()
543 def f():
544 interpreters.destroy(id)
545
546 t = threading.Thread(target=f)
547 t.start()
548 t.join()
549
550 def test_still_running(self):
551 main, = interpreters.list_all()
552 interp = interpreters.create()
553 with _running(interp):
554 self.assertTrue(interpreters.is_running(interp),
555 msg=f"Interp {interp} should be running before destruction.")
556
557 with self.assertRaises(RuntimeError,
558 msg=f"Should not be able to destroy interp {interp} while it's still running."):
559 interpreters.destroy(interp)
560 self.assertTrue(interpreters.is_running(interp))
561
562
563 class ESC[4;38;5;81mRunStringTests(ESC[4;38;5;149mTestBase):
564
565 def setUp(self):
566 super().setUp()
567 self.id = interpreters.create()
568
569 def test_success(self):
570 script, file = _captured_script('print("it worked!", end="")')
571 with file:
572 interpreters.run_string(self.id, script)
573 out = file.read()
574
575 self.assertEqual(out, 'it worked!')
576
577 def test_in_thread(self):
578 script, file = _captured_script('print("it worked!", end="")')
579 with file:
580 def f():
581 interpreters.run_string(self.id, script)
582
583 t = threading.Thread(target=f)
584 t.start()
585 t.join()
586 out = file.read()
587
588 self.assertEqual(out, 'it worked!')
589
590 def test_create_thread(self):
591 subinterp = interpreters.create()
592 script, file = _captured_script("""
593 import threading
594 def f():
595 print('it worked!', end='')
596
597 t = threading.Thread(target=f)
598 t.start()
599 t.join()
600 """)
601 with file:
602 interpreters.run_string(subinterp, script)
603 out = file.read()
604
605 self.assertEqual(out, 'it worked!')
606
607 def test_create_daemon_thread(self):
608 with self.subTest('isolated'):
609 expected = 'spam spam spam spam spam'
610 subinterp = interpreters.create(isolated=True)
611 script, file = _captured_script(f"""
612 import threading
613 def f():
614 print('it worked!', end='')
615
616 try:
617 t = threading.Thread(target=f, daemon=True)
618 t.start()
619 t.join()
620 except RuntimeError:
621 print('{expected}', end='')
622 """)
623 with file:
624 interpreters.run_string(subinterp, script)
625 out = file.read()
626
627 self.assertEqual(out, expected)
628
629 with self.subTest('not isolated'):
630 subinterp = interpreters.create(isolated=False)
631 script, file = _captured_script("""
632 import threading
633 def f():
634 print('it worked!', end='')
635
636 t = threading.Thread(target=f, daemon=True)
637 t.start()
638 t.join()
639 """)
640 with file:
641 interpreters.run_string(subinterp, script)
642 out = file.read()
643
644 self.assertEqual(out, 'it worked!')
645
646 def test_shareable_types(self):
647 interp = interpreters.create()
648 objects = [
649 None,
650 'spam',
651 b'spam',
652 42,
653 ]
654 for obj in objects:
655 with self.subTest(obj):
656 interpreters.run_string(
657 interp,
658 f'assert(obj == {obj!r})',
659 shared=dict(obj=obj),
660 )
661
662 def test_os_exec(self):
663 expected = 'spam spam spam spam spam'
664 subinterp = interpreters.create()
665 script, file = _captured_script(f"""
666 import os, sys
667 try:
668 os.execl(sys.executable)
669 except RuntimeError:
670 print('{expected}', end='')
671 """)
672 with file:
673 interpreters.run_string(subinterp, script)
674 out = file.read()
675
676 self.assertEqual(out, expected)
677
678 @support.requires_fork()
679 def test_fork(self):
680 import tempfile
681 with tempfile.NamedTemporaryFile('w+', encoding="utf-8") as file:
682 file.write('')
683 file.flush()
684
685 expected = 'spam spam spam spam spam'
686 script = dedent(f"""
687 import os
688 try:
689 os.fork()
690 except RuntimeError:
691 with open('{file.name}', 'w', encoding='utf-8') as out:
692 out.write('{expected}')
693 """)
694 interpreters.run_string(self.id, script)
695
696 file.seek(0)
697 content = file.read()
698 self.assertEqual(content, expected)
699
700 def test_already_running(self):
701 with _running(self.id):
702 with self.assertRaises(RuntimeError):
703 interpreters.run_string(self.id, 'print("spam")')
704
705 def test_does_not_exist(self):
706 id = 0
707 while id in interpreters.list_all():
708 id += 1
709 with self.assertRaises(RuntimeError):
710 interpreters.run_string(id, 'print("spam")')
711
712 def test_error_id(self):
713 with self.assertRaises(ValueError):
714 interpreters.run_string(-1, 'print("spam")')
715
716 def test_bad_id(self):
717 with self.assertRaises(TypeError):
718 interpreters.run_string('spam', 'print("spam")')
719
720 def test_bad_script(self):
721 with self.assertRaises(TypeError):
722 interpreters.run_string(self.id, 10)
723
724 def test_bytes_for_script(self):
725 with self.assertRaises(TypeError):
726 interpreters.run_string(self.id, b'print("spam")')
727
728 @contextlib.contextmanager
729 def assert_run_failed(self, exctype, msg=None):
730 with self.assertRaises(interpreters.RunFailedError) as caught:
731 yield
732 if msg is None:
733 self.assertEqual(str(caught.exception).split(':')[0],
734 str(exctype))
735 else:
736 self.assertEqual(str(caught.exception),
737 "{}: {}".format(exctype, msg))
738
739 def test_invalid_syntax(self):
740 with self.assert_run_failed(SyntaxError):
741 # missing close paren
742 interpreters.run_string(self.id, 'print("spam"')
743
744 def test_failure(self):
745 with self.assert_run_failed(Exception, 'spam'):
746 interpreters.run_string(self.id, 'raise Exception("spam")')
747
748 def test_SystemExit(self):
749 with self.assert_run_failed(SystemExit, '42'):
750 interpreters.run_string(self.id, 'raise SystemExit(42)')
751
752 def test_sys_exit(self):
753 with self.assert_run_failed(SystemExit):
754 interpreters.run_string(self.id, dedent("""
755 import sys
756 sys.exit()
757 """))
758
759 with self.assert_run_failed(SystemExit, '42'):
760 interpreters.run_string(self.id, dedent("""
761 import sys
762 sys.exit(42)
763 """))
764
765 def test_with_shared(self):
766 r, w = os.pipe()
767
768 shared = {
769 'spam': b'ham',
770 'eggs': b'-1',
771 'cheddar': None,
772 }
773 script = dedent(f"""
774 eggs = int(eggs)
775 spam = 42
776 result = spam + eggs
777
778 ns = dict(vars())
779 del ns['__builtins__']
780 import pickle
781 with open({w}, 'wb') as chan:
782 pickle.dump(ns, chan)
783 """)
784 interpreters.run_string(self.id, script, shared)
785 with open(r, 'rb') as chan:
786 ns = pickle.load(chan)
787
788 self.assertEqual(ns['spam'], 42)
789 self.assertEqual(ns['eggs'], -1)
790 self.assertEqual(ns['result'], 41)
791 self.assertIsNone(ns['cheddar'])
792
793 def test_shared_overwrites(self):
794 interpreters.run_string(self.id, dedent("""
795 spam = 'eggs'
796 ns1 = dict(vars())
797 del ns1['__builtins__']
798 """))
799
800 shared = {'spam': b'ham'}
801 script = dedent("""
802 ns2 = dict(vars())
803 del ns2['__builtins__']
804 """)
805 interpreters.run_string(self.id, script, shared)
806
807 r, w = os.pipe()
808 script = dedent(f"""
809 ns = dict(vars())
810 del ns['__builtins__']
811 import pickle
812 with open({w}, 'wb') as chan:
813 pickle.dump(ns, chan)
814 """)
815 interpreters.run_string(self.id, script)
816 with open(r, 'rb') as chan:
817 ns = pickle.load(chan)
818
819 self.assertEqual(ns['ns1']['spam'], 'eggs')
820 self.assertEqual(ns['ns2']['spam'], b'ham')
821 self.assertEqual(ns['spam'], b'ham')
822
823 def test_shared_overwrites_default_vars(self):
824 r, w = os.pipe()
825
826 shared = {'__name__': b'not __main__'}
827 script = dedent(f"""
828 spam = 42
829
830 ns = dict(vars())
831 del ns['__builtins__']
832 import pickle
833 with open({w}, 'wb') as chan:
834 pickle.dump(ns, chan)
835 """)
836 interpreters.run_string(self.id, script, shared)
837 with open(r, 'rb') as chan:
838 ns = pickle.load(chan)
839
840 self.assertEqual(ns['__name__'], b'not __main__')
841
842 def test_main_reused(self):
843 r, w = os.pipe()
844 interpreters.run_string(self.id, dedent(f"""
845 spam = True
846
847 ns = dict(vars())
848 del ns['__builtins__']
849 import pickle
850 with open({w}, 'wb') as chan:
851 pickle.dump(ns, chan)
852 del ns, pickle, chan
853 """))
854 with open(r, 'rb') as chan:
855 ns1 = pickle.load(chan)
856
857 r, w = os.pipe()
858 interpreters.run_string(self.id, dedent(f"""
859 eggs = False
860
861 ns = dict(vars())
862 del ns['__builtins__']
863 import pickle
864 with open({w}, 'wb') as chan:
865 pickle.dump(ns, chan)
866 """))
867 with open(r, 'rb') as chan:
868 ns2 = pickle.load(chan)
869
870 self.assertIn('spam', ns1)
871 self.assertNotIn('eggs', ns1)
872 self.assertIn('eggs', ns2)
873 self.assertIn('spam', ns2)
874
875 def test_execution_namespace_is_main(self):
876 r, w = os.pipe()
877
878 script = dedent(f"""
879 spam = 42
880
881 ns = dict(vars())
882 ns['__builtins__'] = str(ns['__builtins__'])
883 import pickle
884 with open({w}, 'wb') as chan:
885 pickle.dump(ns, chan)
886 """)
887 interpreters.run_string(self.id, script)
888 with open(r, 'rb') as chan:
889 ns = pickle.load(chan)
890
891 ns.pop('__builtins__')
892 ns.pop('__loader__')
893 self.assertEqual(ns, {
894 '__name__': '__main__',
895 '__annotations__': {},
896 '__doc__': None,
897 '__package__': None,
898 '__spec__': None,
899 'spam': 42,
900 })
901
902 # XXX Fix this test!
903 @unittest.skip('blocking forever')
904 def test_still_running_at_exit(self):
905 script = dedent("""
906 from textwrap import dedent
907 import threading
908 import _xxsubinterpreters as _interpreters
909 id = _interpreters.create()
910 def f():
911 _interpreters.run_string(id, dedent('''
912 import time
913 # Give plenty of time for the main interpreter to finish.
914 time.sleep(1_000_000)
915 '''))
916
917 t = threading.Thread(target=f)
918 t.start()
919 """)
920 with support.temp_dir() as dirname:
921 filename = script_helper.make_script(dirname, 'interp', script)
922 with script_helper.spawn_python(filename) as proc:
923 retcode = proc.wait()
924
925 self.assertEqual(retcode, 0)
926
927
928 if __name__ == '__main__':
929 unittest.main()