1 from collections import namedtuple
2 import contextlib
3 import itertools
4 import os
5 import pickle
6 import sys
7 from textwrap import dedent
8 import threading
9 import time
10 import unittest
11
12 from test import support
13 from test.support import import_helper
14 from test.support import script_helper
15
16
17 interpreters = import_helper.import_module('_xxsubinterpreters')
18
19
20 ##################################
21 # helpers
22
23 def _captured_script(script):
24 r, w = os.pipe()
25 indented = script.replace('\n', '\n ')
26 wrapped = dedent(f"""
27 import contextlib
28 with open({w}, 'w', encoding="utf-8") as spipe:
29 with contextlib.redirect_stdout(spipe):
30 {indented}
31 """)
32 return wrapped, open(r, encoding="utf-8")
33
34
35 def _run_output(interp, request, shared=None):
36 script, rpipe = _captured_script(request)
37 with rpipe:
38 interpreters.run_string(interp, script, shared)
39 return rpipe.read()
40
41
42 def _wait_for_interp_to_run(interp, timeout=None):
43 # bpo-37224: Running this test file in multiprocesses will fail randomly.
44 # The failure reason is that the thread can't acquire the cpu to
45 # run subinterpreter eariler than the main thread in multiprocess.
46 if timeout is None:
47 timeout = support.SHORT_TIMEOUT
48 for _ in support.sleeping_retry(timeout, error=False):
49 if interpreters.is_running(interp):
50 break
51 else:
52 raise RuntimeError('interp is not running')
53
54
55 @contextlib.contextmanager
56 def _running(interp):
57 r, w = os.pipe()
58 def run():
59 interpreters.run_string(interp, dedent(f"""
60 # wait for "signal"
61 with open({r}, encoding="utf-8") as rpipe:
62 rpipe.read()
63 """))
64
65 t = threading.Thread(target=run)
66 t.start()
67 _wait_for_interp_to_run(interp)
68
69 yield
70
71 with open(w, 'w', encoding="utf-8") as spipe:
72 spipe.write('done')
73 t.join()
74
75
76 #@contextmanager
77 #def run_threaded(id, source, **shared):
78 # def run():
79 # run_interp(id, source, **shared)
80 # t = threading.Thread(target=run)
81 # t.start()
82 # yield
83 # t.join()
84
85
86 def run_interp(id, source, **shared):
87 _run_interp(id, source, shared)
88
89
90 def _run_interp(id, source, shared, _mainns={}):
91 source = dedent(source)
92 main = interpreters.get_main()
93 if main == id:
94 if interpreters.get_current() != main:
95 raise RuntimeError
96 # XXX Run a func?
97 exec(source, _mainns)
98 else:
99 interpreters.run_string(id, source, shared)
100
101
102 class ESC[4;38;5;81mInterpreter(ESC[4;38;5;149mnamedtuple('Interpreter', 'name id')):
103
104 @classmethod
105 def from_raw(cls, raw):
106 if isinstance(raw, cls):
107 return raw
108 elif isinstance(raw, str):
109 return cls(raw)
110 else:
111 raise NotImplementedError
112
113 def __new__(cls, name=None, id=None):
114 main = interpreters.get_main()
115 if id == main:
116 if not name:
117 name = 'main'
118 elif name != 'main':
119 raise ValueError(
120 'name mismatch (expected "main", got "{}")'.format(name))
121 id = main
122 elif id is not None:
123 if not name:
124 name = 'interp'
125 elif name == 'main':
126 raise ValueError('name mismatch (unexpected "main")')
127 if not isinstance(id, interpreters.InterpreterID):
128 id = interpreters.InterpreterID(id)
129 elif not name or name == 'main':
130 name = 'main'
131 id = main
132 else:
133 id = interpreters.create()
134 self = super().__new__(cls, name, id)
135 return self
136
137
138 # XXX expect_channel_closed() is unnecessary once we improve exc propagation.
139
140 @contextlib.contextmanager
141 def expect_channel_closed():
142 try:
143 yield
144 except interpreters.ChannelClosedError:
145 pass
146 else:
147 assert False, 'channel not closed'
148
149
150 class ESC[4;38;5;81mChannelAction(ESC[4;38;5;149mnamedtuple('ChannelAction', 'action end interp')):
151
152 def __new__(cls, action, end=None, interp=None):
153 if not end:
154 end = 'both'
155 if not interp:
156 interp = 'main'
157 self = super().__new__(cls, action, end, interp)
158 return self
159
160 def __init__(self, *args, **kwargs):
161 if self.action == 'use':
162 if self.end not in ('same', 'opposite', 'send', 'recv'):
163 raise ValueError(self.end)
164 elif self.action in ('close', 'force-close'):
165 if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
166 raise ValueError(self.end)
167 else:
168 raise ValueError(self.action)
169 if self.interp not in ('main', 'same', 'other', 'extra'):
170 raise ValueError(self.interp)
171
172 def resolve_end(self, end):
173 if self.end == 'same':
174 return end
175 elif self.end == 'opposite':
176 return 'recv' if end == 'send' else 'send'
177 else:
178 return self.end
179
180 def resolve_interp(self, interp, other, extra):
181 if self.interp == 'same':
182 return interp
183 elif self.interp == 'other':
184 if other is None:
185 raise RuntimeError
186 return other
187 elif self.interp == 'extra':
188 if extra is None:
189 raise RuntimeError
190 return extra
191 elif self.interp == 'main':
192 if interp.name == 'main':
193 return interp
194 elif other and other.name == 'main':
195 return other
196 else:
197 raise RuntimeError
198 # Per __init__(), there aren't any others.
199
200
201 class ESC[4;38;5;81mChannelState(ESC[4;38;5;149mnamedtuple('ChannelState', 'pending closed')):
202
203 def __new__(cls, pending=0, *, closed=False):
204 self = super().__new__(cls, pending, closed)
205 return self
206
207 def incr(self):
208 return type(self)(self.pending + 1, closed=self.closed)
209
210 def decr(self):
211 return type(self)(self.pending - 1, closed=self.closed)
212
213 def close(self, *, force=True):
214 if self.closed:
215 if not force or self.pending == 0:
216 return self
217 return type(self)(0 if force else self.pending, closed=True)
218
219
220 def run_action(cid, action, end, state, *, hideclosed=True):
221 if state.closed:
222 if action == 'use' and end == 'recv' and state.pending:
223 expectfail = False
224 else:
225 expectfail = True
226 else:
227 expectfail = False
228
229 try:
230 result = _run_action(cid, action, end, state)
231 except interpreters.ChannelClosedError:
232 if not hideclosed and not expectfail:
233 raise
234 result = state.close()
235 else:
236 if expectfail:
237 raise ... # XXX
238 return result
239
240
241 def _run_action(cid, action, end, state):
242 if action == 'use':
243 if end == 'send':
244 interpreters.channel_send(cid, b'spam')
245 return state.incr()
246 elif end == 'recv':
247 if not state.pending:
248 try:
249 interpreters.channel_recv(cid)
250 except interpreters.ChannelEmptyError:
251 return state
252 else:
253 raise Exception('expected ChannelEmptyError')
254 else:
255 interpreters.channel_recv(cid)
256 return state.decr()
257 else:
258 raise ValueError(end)
259 elif action == 'close':
260 kwargs = {}
261 if end in ('recv', 'send'):
262 kwargs[end] = True
263 interpreters.channel_close(cid, **kwargs)
264 return state.close()
265 elif action == 'force-close':
266 kwargs = {
267 'force': True,
268 }
269 if end in ('recv', 'send'):
270 kwargs[end] = True
271 interpreters.channel_close(cid, **kwargs)
272 return state.close(force=True)
273 else:
274 raise ValueError(action)
275
276
277 def clean_up_interpreters():
278 for id in interpreters.list_all():
279 if id == 0: # main
280 continue
281 try:
282 interpreters.destroy(id)
283 except RuntimeError:
284 pass # already destroyed
285
286
287 def clean_up_channels():
288 for cid in interpreters.channel_list_all():
289 try:
290 interpreters.channel_destroy(cid)
291 except interpreters.ChannelNotFoundError:
292 pass # already destroyed
293
294
295 class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
296
297 def tearDown(self):
298 clean_up_interpreters()
299 clean_up_channels()
300
301
302 ##################################
303 # misc. tests
304
305 class ESC[4;38;5;81mIsShareableTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
306
307 def test_default_shareables(self):
308 shareables = [
309 # singletons
310 None,
311 # builtin objects
312 b'spam',
313 'spam',
314 10,
315 -10,
316 ]
317 for obj in shareables:
318 with self.subTest(obj):
319 self.assertTrue(
320 interpreters.is_shareable(obj))
321
322 def test_not_shareable(self):
323 class ESC[4;38;5;81mCheese:
324 def __init__(self, name):
325 self.name = name
326 def __str__(self):
327 return self.name
328
329 class ESC[4;38;5;81mSubBytes(ESC[4;38;5;149mbytes):
330 """A subclass of a shareable type."""
331
332 not_shareables = [
333 # singletons
334 True,
335 False,
336 NotImplemented,
337 ...,
338 # builtin types and objects
339 type,
340 object,
341 object(),
342 Exception(),
343 100.0,
344 # user-defined types and objects
345 Cheese,
346 Cheese('Wensleydale'),
347 SubBytes(b'spam'),
348 ]
349 for obj in not_shareables:
350 with self.subTest(repr(obj)):
351 self.assertFalse(
352 interpreters.is_shareable(obj))
353
354
355 class ESC[4;38;5;81mShareableTypeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
356
357 def setUp(self):
358 super().setUp()
359 self.cid = interpreters.channel_create()
360
361 def tearDown(self):
362 interpreters.channel_destroy(self.cid)
363 super().tearDown()
364
365 def _assert_values(self, values):
366 for obj in values:
367 with self.subTest(obj):
368 interpreters.channel_send(self.cid, obj)
369 got = interpreters.channel_recv(self.cid)
370
371 self.assertEqual(got, obj)
372 self.assertIs(type(got), type(obj))
373 # XXX Check the following in the channel tests?
374 #self.assertIsNot(got, obj)
375
376 def test_singletons(self):
377 for obj in [None]:
378 with self.subTest(obj):
379 interpreters.channel_send(self.cid, obj)
380 got = interpreters.channel_recv(self.cid)
381
382 # XXX What about between interpreters?
383 self.assertIs(got, obj)
384
385 def test_types(self):
386 self._assert_values([
387 b'spam',
388 9999,
389 self.cid,
390 ])
391
392 def test_bytes(self):
393 self._assert_values(i.to_bytes(2, 'little', signed=True)
394 for i in range(-1, 258))
395
396 def test_strs(self):
397 self._assert_values(['hello world', 'ä½ å¥½ä¸–ç•Œ', ''])
398
399 def test_int(self):
400 self._assert_values(itertools.chain(range(-1, 258),
401 [sys.maxsize, -sys.maxsize - 1]))
402
403 def test_non_shareable_int(self):
404 ints = [
405 sys.maxsize + 1,
406 -sys.maxsize - 2,
407 2**1000,
408 ]
409 for i in ints:
410 with self.subTest(i):
411 with self.assertRaises(OverflowError):
412 interpreters.channel_send(self.cid, i)
413
414
415 ##################################
416 # interpreter tests
417
418 class ESC[4;38;5;81mListAllTests(ESC[4;38;5;149mTestBase):
419
420 def test_initial(self):
421 main = interpreters.get_main()
422 ids = interpreters.list_all()
423 self.assertEqual(ids, [main])
424
425 def test_after_creating(self):
426 main = interpreters.get_main()
427 first = interpreters.create()
428 second = interpreters.create()
429 ids = interpreters.list_all()
430 self.assertEqual(ids, [main, first, second])
431
432 def test_after_destroying(self):
433 main = interpreters.get_main()
434 first = interpreters.create()
435 second = interpreters.create()
436 interpreters.destroy(first)
437 ids = interpreters.list_all()
438 self.assertEqual(ids, [main, second])
439
440
441 class ESC[4;38;5;81mGetCurrentTests(ESC[4;38;5;149mTestBase):
442
443 def test_main(self):
444 main = interpreters.get_main()
445 cur = interpreters.get_current()
446 self.assertEqual(cur, main)
447 self.assertIsInstance(cur, interpreters.InterpreterID)
448
449 def test_subinterpreter(self):
450 main = interpreters.get_main()
451 interp = interpreters.create()
452 out = _run_output(interp, dedent("""
453 import _xxsubinterpreters as _interpreters
454 cur = _interpreters.get_current()
455 print(cur)
456 assert isinstance(cur, _interpreters.InterpreterID)
457 """))
458 cur = int(out.strip())
459 _, expected = interpreters.list_all()
460 self.assertEqual(cur, expected)
461 self.assertNotEqual(cur, main)
462
463
464 class ESC[4;38;5;81mGetMainTests(ESC[4;38;5;149mTestBase):
465
466 def test_from_main(self):
467 [expected] = interpreters.list_all()
468 main = interpreters.get_main()
469 self.assertEqual(main, expected)
470 self.assertIsInstance(main, interpreters.InterpreterID)
471
472 def test_from_subinterpreter(self):
473 [expected] = interpreters.list_all()
474 interp = interpreters.create()
475 out = _run_output(interp, dedent("""
476 import _xxsubinterpreters as _interpreters
477 main = _interpreters.get_main()
478 print(main)
479 assert isinstance(main, _interpreters.InterpreterID)
480 """))
481 main = int(out.strip())
482 self.assertEqual(main, expected)
483
484
485 class ESC[4;38;5;81mIsRunningTests(ESC[4;38;5;149mTestBase):
486
487 def test_main(self):
488 main = interpreters.get_main()
489 self.assertTrue(interpreters.is_running(main))
490
491 @unittest.skip('Fails on FreeBSD')
492 def test_subinterpreter(self):
493 interp = interpreters.create()
494 self.assertFalse(interpreters.is_running(interp))
495
496 with _running(interp):
497 self.assertTrue(interpreters.is_running(interp))
498 self.assertFalse(interpreters.is_running(interp))
499
500 def test_from_subinterpreter(self):
501 interp = interpreters.create()
502 out = _run_output(interp, dedent(f"""
503 import _xxsubinterpreters as _interpreters
504 if _interpreters.is_running({interp}):
505 print(True)
506 else:
507 print(False)
508 """))
509 self.assertEqual(out.strip(), 'True')
510
511 def test_already_destroyed(self):
512 interp = interpreters.create()
513 interpreters.destroy(interp)
514 with self.assertRaises(RuntimeError):
515 interpreters.is_running(interp)
516
517 def test_does_not_exist(self):
518 with self.assertRaises(RuntimeError):
519 interpreters.is_running(1_000_000)
520
521 def test_bad_id(self):
522 with self.assertRaises(ValueError):
523 interpreters.is_running(-1)
524
525
526 class ESC[4;38;5;81mInterpreterIDTests(ESC[4;38;5;149mTestBase):
527
528 def test_with_int(self):
529 id = interpreters.InterpreterID(10, force=True)
530
531 self.assertEqual(int(id), 10)
532
533 def test_coerce_id(self):
534 class ESC[4;38;5;81mInt(ESC[4;38;5;149mstr):
535 def __index__(self):
536 return 10
537
538 id = interpreters.InterpreterID(Int(), force=True)
539 self.assertEqual(int(id), 10)
540
541 def test_bad_id(self):
542 self.assertRaises(TypeError, interpreters.InterpreterID, object())
543 self.assertRaises(TypeError, interpreters.InterpreterID, 10.0)
544 self.assertRaises(TypeError, interpreters.InterpreterID, '10')
545 self.assertRaises(TypeError, interpreters.InterpreterID, b'10')
546 self.assertRaises(ValueError, interpreters.InterpreterID, -1)
547 self.assertRaises(OverflowError, interpreters.InterpreterID, 2**64)
548
549 def test_does_not_exist(self):
550 id = interpreters.channel_create()
551 with self.assertRaises(RuntimeError):
552 interpreters.InterpreterID(int(id) + 1) # unforced
553
554 def test_str(self):
555 id = interpreters.InterpreterID(10, force=True)
556 self.assertEqual(str(id), '10')
557
558 def test_repr(self):
559 id = interpreters.InterpreterID(10, force=True)
560 self.assertEqual(repr(id), 'InterpreterID(10)')
561
562 def test_equality(self):
563 id1 = interpreters.create()
564 id2 = interpreters.InterpreterID(int(id1))
565 id3 = interpreters.create()
566
567 self.assertTrue(id1 == id1)
568 self.assertTrue(id1 == id2)
569 self.assertTrue(id1 == int(id1))
570 self.assertTrue(int(id1) == id1)
571 self.assertTrue(id1 == float(int(id1)))
572 self.assertTrue(float(int(id1)) == id1)
573 self.assertFalse(id1 == float(int(id1)) + 0.1)
574 self.assertFalse(id1 == str(int(id1)))
575 self.assertFalse(id1 == 2**1000)
576 self.assertFalse(id1 == float('inf'))
577 self.assertFalse(id1 == 'spam')
578 self.assertFalse(id1 == id3)
579
580 self.assertFalse(id1 != id1)
581 self.assertFalse(id1 != id2)
582 self.assertTrue(id1 != id3)
583
584
585 class ESC[4;38;5;81mCreateTests(ESC[4;38;5;149mTestBase):
586
587 def test_in_main(self):
588 id = interpreters.create()
589 self.assertIsInstance(id, interpreters.InterpreterID)
590
591 self.assertIn(id, interpreters.list_all())
592
593 @unittest.skip('enable this test when working on pystate.c')
594 def test_unique_id(self):
595 seen = set()
596 for _ in range(100):
597 id = interpreters.create()
598 interpreters.destroy(id)
599 seen.add(id)
600
601 self.assertEqual(len(seen), 100)
602
603 def test_in_thread(self):
604 lock = threading.Lock()
605 id = None
606 def f():
607 nonlocal id
608 id = interpreters.create()
609 lock.acquire()
610 lock.release()
611
612 t = threading.Thread(target=f)
613 with lock:
614 t.start()
615 t.join()
616 self.assertIn(id, interpreters.list_all())
617
618 def test_in_subinterpreter(self):
619 main, = interpreters.list_all()
620 id1 = interpreters.create()
621 out = _run_output(id1, dedent("""
622 import _xxsubinterpreters as _interpreters
623 id = _interpreters.create()
624 print(id)
625 assert isinstance(id, _interpreters.InterpreterID)
626 """))
627 id2 = int(out.strip())
628
629 self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
630
631 def test_in_threaded_subinterpreter(self):
632 main, = interpreters.list_all()
633 id1 = interpreters.create()
634 id2 = None
635 def f():
636 nonlocal id2
637 out = _run_output(id1, dedent("""
638 import _xxsubinterpreters as _interpreters
639 id = _interpreters.create()
640 print(id)
641 """))
642 id2 = int(out.strip())
643
644 t = threading.Thread(target=f)
645 t.start()
646 t.join()
647
648 self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
649
650 def test_after_destroy_all(self):
651 before = set(interpreters.list_all())
652 # Create 3 subinterpreters.
653 ids = []
654 for _ in range(3):
655 id = interpreters.create()
656 ids.append(id)
657 # Now destroy them.
658 for id in ids:
659 interpreters.destroy(id)
660 # Finally, create another.
661 id = interpreters.create()
662 self.assertEqual(set(interpreters.list_all()), before | {id})
663
664 def test_after_destroy_some(self):
665 before = set(interpreters.list_all())
666 # Create 3 subinterpreters.
667 id1 = interpreters.create()
668 id2 = interpreters.create()
669 id3 = interpreters.create()
670 # Now destroy 2 of them.
671 interpreters.destroy(id1)
672 interpreters.destroy(id3)
673 # Finally, create another.
674 id = interpreters.create()
675 self.assertEqual(set(interpreters.list_all()), before | {id, id2})
676
677
678 class ESC[4;38;5;81mDestroyTests(ESC[4;38;5;149mTestBase):
679
680 def test_one(self):
681 id1 = interpreters.create()
682 id2 = interpreters.create()
683 id3 = interpreters.create()
684 self.assertIn(id2, interpreters.list_all())
685 interpreters.destroy(id2)
686 self.assertNotIn(id2, interpreters.list_all())
687 self.assertIn(id1, interpreters.list_all())
688 self.assertIn(id3, interpreters.list_all())
689
690 def test_all(self):
691 before = set(interpreters.list_all())
692 ids = set()
693 for _ in range(3):
694 id = interpreters.create()
695 ids.add(id)
696 self.assertEqual(set(interpreters.list_all()), before | ids)
697 for id in ids:
698 interpreters.destroy(id)
699 self.assertEqual(set(interpreters.list_all()), before)
700
701 def test_main(self):
702 main, = interpreters.list_all()
703 with self.assertRaises(RuntimeError):
704 interpreters.destroy(main)
705
706 def f():
707 with self.assertRaises(RuntimeError):
708 interpreters.destroy(main)
709
710 t = threading.Thread(target=f)
711 t.start()
712 t.join()
713
714 def test_already_destroyed(self):
715 id = interpreters.create()
716 interpreters.destroy(id)
717 with self.assertRaises(RuntimeError):
718 interpreters.destroy(id)
719
720 def test_does_not_exist(self):
721 with self.assertRaises(RuntimeError):
722 interpreters.destroy(1_000_000)
723
724 def test_bad_id(self):
725 with self.assertRaises(ValueError):
726 interpreters.destroy(-1)
727
728 def test_from_current(self):
729 main, = interpreters.list_all()
730 id = interpreters.create()
731 script = dedent(f"""
732 import _xxsubinterpreters as _interpreters
733 try:
734 _interpreters.destroy({id})
735 except RuntimeError:
736 pass
737 """)
738
739 interpreters.run_string(id, script)
740 self.assertEqual(set(interpreters.list_all()), {main, id})
741
742 def test_from_sibling(self):
743 main, = interpreters.list_all()
744 id1 = interpreters.create()
745 id2 = interpreters.create()
746 script = dedent(f"""
747 import _xxsubinterpreters as _interpreters
748 _interpreters.destroy({id2})
749 """)
750 interpreters.run_string(id1, script)
751
752 self.assertEqual(set(interpreters.list_all()), {main, id1})
753
754 def test_from_other_thread(self):
755 id = interpreters.create()
756 def f():
757 interpreters.destroy(id)
758
759 t = threading.Thread(target=f)
760 t.start()
761 t.join()
762
763 def test_still_running(self):
764 main, = interpreters.list_all()
765 interp = interpreters.create()
766 with _running(interp):
767 self.assertTrue(interpreters.is_running(interp),
768 msg=f"Interp {interp} should be running before destruction.")
769
770 with self.assertRaises(RuntimeError,
771 msg=f"Should not be able to destroy interp {interp} while it's still running."):
772 interpreters.destroy(interp)
773 self.assertTrue(interpreters.is_running(interp))
774
775
776 class ESC[4;38;5;81mRunStringTests(ESC[4;38;5;149mTestBase):
777
778 def setUp(self):
779 super().setUp()
780 self.id = interpreters.create()
781
782 def test_success(self):
783 script, file = _captured_script('print("it worked!", end="")')
784 with file:
785 interpreters.run_string(self.id, script)
786 out = file.read()
787
788 self.assertEqual(out, 'it worked!')
789
790 def test_in_thread(self):
791 script, file = _captured_script('print("it worked!", end="")')
792 with file:
793 def f():
794 interpreters.run_string(self.id, script)
795
796 t = threading.Thread(target=f)
797 t.start()
798 t.join()
799 out = file.read()
800
801 self.assertEqual(out, 'it worked!')
802
803 def test_create_thread(self):
804 subinterp = interpreters.create(isolated=False)
805 script, file = _captured_script("""
806 import threading
807 def f():
808 print('it worked!', end='')
809
810 t = threading.Thread(target=f)
811 t.start()
812 t.join()
813 """)
814 with file:
815 interpreters.run_string(subinterp, script)
816 out = file.read()
817
818 self.assertEqual(out, 'it worked!')
819
820 @support.requires_fork()
821 def test_fork(self):
822 import tempfile
823 with tempfile.NamedTemporaryFile('w+', encoding="utf-8") as file:
824 file.write('')
825 file.flush()
826
827 expected = 'spam spam spam spam spam'
828 script = dedent(f"""
829 import os
830 try:
831 os.fork()
832 except RuntimeError:
833 with open('{file.name}', 'w', encoding='utf-8') as out:
834 out.write('{expected}')
835 """)
836 interpreters.run_string(self.id, script)
837
838 file.seek(0)
839 content = file.read()
840 self.assertEqual(content, expected)
841
842 def test_already_running(self):
843 with _running(self.id):
844 with self.assertRaises(RuntimeError):
845 interpreters.run_string(self.id, 'print("spam")')
846
847 def test_does_not_exist(self):
848 id = 0
849 while id in interpreters.list_all():
850 id += 1
851 with self.assertRaises(RuntimeError):
852 interpreters.run_string(id, 'print("spam")')
853
854 def test_error_id(self):
855 with self.assertRaises(ValueError):
856 interpreters.run_string(-1, 'print("spam")')
857
858 def test_bad_id(self):
859 with self.assertRaises(TypeError):
860 interpreters.run_string('spam', 'print("spam")')
861
862 def test_bad_script(self):
863 with self.assertRaises(TypeError):
864 interpreters.run_string(self.id, 10)
865
866 def test_bytes_for_script(self):
867 with self.assertRaises(TypeError):
868 interpreters.run_string(self.id, b'print("spam")')
869
870 @contextlib.contextmanager
871 def assert_run_failed(self, exctype, msg=None):
872 with self.assertRaises(interpreters.RunFailedError) as caught:
873 yield
874 if msg is None:
875 self.assertEqual(str(caught.exception).split(':')[0],
876 str(exctype))
877 else:
878 self.assertEqual(str(caught.exception),
879 "{}: {}".format(exctype, msg))
880
881 def test_invalid_syntax(self):
882 with self.assert_run_failed(SyntaxError):
883 # missing close paren
884 interpreters.run_string(self.id, 'print("spam"')
885
886 def test_failure(self):
887 with self.assert_run_failed(Exception, 'spam'):
888 interpreters.run_string(self.id, 'raise Exception("spam")')
889
890 def test_SystemExit(self):
891 with self.assert_run_failed(SystemExit, '42'):
892 interpreters.run_string(self.id, 'raise SystemExit(42)')
893
894 def test_sys_exit(self):
895 with self.assert_run_failed(SystemExit):
896 interpreters.run_string(self.id, dedent("""
897 import sys
898 sys.exit()
899 """))
900
901 with self.assert_run_failed(SystemExit, '42'):
902 interpreters.run_string(self.id, dedent("""
903 import sys
904 sys.exit(42)
905 """))
906
907 def test_with_shared(self):
908 r, w = os.pipe()
909
910 shared = {
911 'spam': b'ham',
912 'eggs': b'-1',
913 'cheddar': None,
914 }
915 script = dedent(f"""
916 eggs = int(eggs)
917 spam = 42
918 result = spam + eggs
919
920 ns = dict(vars())
921 del ns['__builtins__']
922 import pickle
923 with open({w}, 'wb') as chan:
924 pickle.dump(ns, chan)
925 """)
926 interpreters.run_string(self.id, script, shared)
927 with open(r, 'rb') as chan:
928 ns = pickle.load(chan)
929
930 self.assertEqual(ns['spam'], 42)
931 self.assertEqual(ns['eggs'], -1)
932 self.assertEqual(ns['result'], 41)
933 self.assertIsNone(ns['cheddar'])
934
935 def test_shared_overwrites(self):
936 interpreters.run_string(self.id, dedent("""
937 spam = 'eggs'
938 ns1 = dict(vars())
939 del ns1['__builtins__']
940 """))
941
942 shared = {'spam': b'ham'}
943 script = dedent(f"""
944 ns2 = dict(vars())
945 del ns2['__builtins__']
946 """)
947 interpreters.run_string(self.id, script, shared)
948
949 r, w = os.pipe()
950 script = dedent(f"""
951 ns = dict(vars())
952 del ns['__builtins__']
953 import pickle
954 with open({w}, 'wb') as chan:
955 pickle.dump(ns, chan)
956 """)
957 interpreters.run_string(self.id, script)
958 with open(r, 'rb') as chan:
959 ns = pickle.load(chan)
960
961 self.assertEqual(ns['ns1']['spam'], 'eggs')
962 self.assertEqual(ns['ns2']['spam'], b'ham')
963 self.assertEqual(ns['spam'], b'ham')
964
965 def test_shared_overwrites_default_vars(self):
966 r, w = os.pipe()
967
968 shared = {'__name__': b'not __main__'}
969 script = dedent(f"""
970 spam = 42
971
972 ns = dict(vars())
973 del ns['__builtins__']
974 import pickle
975 with open({w}, 'wb') as chan:
976 pickle.dump(ns, chan)
977 """)
978 interpreters.run_string(self.id, script, shared)
979 with open(r, 'rb') as chan:
980 ns = pickle.load(chan)
981
982 self.assertEqual(ns['__name__'], b'not __main__')
983
984 def test_main_reused(self):
985 r, w = os.pipe()
986 interpreters.run_string(self.id, dedent(f"""
987 spam = True
988
989 ns = dict(vars())
990 del ns['__builtins__']
991 import pickle
992 with open({w}, 'wb') as chan:
993 pickle.dump(ns, chan)
994 del ns, pickle, chan
995 """))
996 with open(r, 'rb') as chan:
997 ns1 = pickle.load(chan)
998
999 r, w = os.pipe()
1000 interpreters.run_string(self.id, dedent(f"""
1001 eggs = False
1002
1003 ns = dict(vars())
1004 del ns['__builtins__']
1005 import pickle
1006 with open({w}, 'wb') as chan:
1007 pickle.dump(ns, chan)
1008 """))
1009 with open(r, 'rb') as chan:
1010 ns2 = pickle.load(chan)
1011
1012 self.assertIn('spam', ns1)
1013 self.assertNotIn('eggs', ns1)
1014 self.assertIn('eggs', ns2)
1015 self.assertIn('spam', ns2)
1016
1017 def test_execution_namespace_is_main(self):
1018 r, w = os.pipe()
1019
1020 script = dedent(f"""
1021 spam = 42
1022
1023 ns = dict(vars())
1024 ns['__builtins__'] = str(ns['__builtins__'])
1025 import pickle
1026 with open({w}, 'wb') as chan:
1027 pickle.dump(ns, chan)
1028 """)
1029 interpreters.run_string(self.id, script)
1030 with open(r, 'rb') as chan:
1031 ns = pickle.load(chan)
1032
1033 ns.pop('__builtins__')
1034 ns.pop('__loader__')
1035 self.assertEqual(ns, {
1036 '__name__': '__main__',
1037 '__annotations__': {},
1038 '__doc__': None,
1039 '__package__': None,
1040 '__spec__': None,
1041 'spam': 42,
1042 })
1043
1044 # XXX Fix this test!
1045 @unittest.skip('blocking forever')
1046 def test_still_running_at_exit(self):
1047 script = dedent(f"""
1048 from textwrap import dedent
1049 import threading
1050 import _xxsubinterpreters as _interpreters
1051 id = _interpreters.create()
1052 def f():
1053 _interpreters.run_string(id, dedent('''
1054 import time
1055 # Give plenty of time for the main interpreter to finish.
1056 time.sleep(1_000_000)
1057 '''))
1058
1059 t = threading.Thread(target=f)
1060 t.start()
1061 """)
1062 with support.temp_dir() as dirname:
1063 filename = script_helper.make_script(dirname, 'interp', script)
1064 with script_helper.spawn_python(filename) as proc:
1065 retcode = proc.wait()
1066
1067 self.assertEqual(retcode, 0)
1068
1069
1070 ##################################
1071 # channel tests
1072
1073 class ESC[4;38;5;81mChannelIDTests(ESC[4;38;5;149mTestBase):
1074
1075 def test_default_kwargs(self):
1076 cid = interpreters._channel_id(10, force=True)
1077
1078 self.assertEqual(int(cid), 10)
1079 self.assertEqual(cid.end, 'both')
1080
1081 def test_with_kwargs(self):
1082 cid = interpreters._channel_id(10, send=True, force=True)
1083 self.assertEqual(cid.end, 'send')
1084
1085 cid = interpreters._channel_id(10, send=True, recv=False, force=True)
1086 self.assertEqual(cid.end, 'send')
1087
1088 cid = interpreters._channel_id(10, recv=True, force=True)
1089 self.assertEqual(cid.end, 'recv')
1090
1091 cid = interpreters._channel_id(10, recv=True, send=False, force=True)
1092 self.assertEqual(cid.end, 'recv')
1093
1094 cid = interpreters._channel_id(10, send=True, recv=True, force=True)
1095 self.assertEqual(cid.end, 'both')
1096
1097 def test_coerce_id(self):
1098 class ESC[4;38;5;81mInt(ESC[4;38;5;149mstr):
1099 def __index__(self):
1100 return 10
1101
1102 cid = interpreters._channel_id(Int(), force=True)
1103 self.assertEqual(int(cid), 10)
1104
1105 def test_bad_id(self):
1106 self.assertRaises(TypeError, interpreters._channel_id, object())
1107 self.assertRaises(TypeError, interpreters._channel_id, 10.0)
1108 self.assertRaises(TypeError, interpreters._channel_id, '10')
1109 self.assertRaises(TypeError, interpreters._channel_id, b'10')
1110 self.assertRaises(ValueError, interpreters._channel_id, -1)
1111 self.assertRaises(OverflowError, interpreters._channel_id, 2**64)
1112
1113 def test_bad_kwargs(self):
1114 with self.assertRaises(ValueError):
1115 interpreters._channel_id(10, send=False, recv=False)
1116
1117 def test_does_not_exist(self):
1118 cid = interpreters.channel_create()
1119 with self.assertRaises(interpreters.ChannelNotFoundError):
1120 interpreters._channel_id(int(cid) + 1) # unforced
1121
1122 def test_str(self):
1123 cid = interpreters._channel_id(10, force=True)
1124 self.assertEqual(str(cid), '10')
1125
1126 def test_repr(self):
1127 cid = interpreters._channel_id(10, force=True)
1128 self.assertEqual(repr(cid), 'ChannelID(10)')
1129
1130 cid = interpreters._channel_id(10, send=True, force=True)
1131 self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
1132
1133 cid = interpreters._channel_id(10, recv=True, force=True)
1134 self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
1135
1136 cid = interpreters._channel_id(10, send=True, recv=True, force=True)
1137 self.assertEqual(repr(cid), 'ChannelID(10)')
1138
1139 def test_equality(self):
1140 cid1 = interpreters.channel_create()
1141 cid2 = interpreters._channel_id(int(cid1))
1142 cid3 = interpreters.channel_create()
1143
1144 self.assertTrue(cid1 == cid1)
1145 self.assertTrue(cid1 == cid2)
1146 self.assertTrue(cid1 == int(cid1))
1147 self.assertTrue(int(cid1) == cid1)
1148 self.assertTrue(cid1 == float(int(cid1)))
1149 self.assertTrue(float(int(cid1)) == cid1)
1150 self.assertFalse(cid1 == float(int(cid1)) + 0.1)
1151 self.assertFalse(cid1 == str(int(cid1)))
1152 self.assertFalse(cid1 == 2**1000)
1153 self.assertFalse(cid1 == float('inf'))
1154 self.assertFalse(cid1 == 'spam')
1155 self.assertFalse(cid1 == cid3)
1156
1157 self.assertFalse(cid1 != cid1)
1158 self.assertFalse(cid1 != cid2)
1159 self.assertTrue(cid1 != cid3)
1160
1161
1162 class ESC[4;38;5;81mChannelTests(ESC[4;38;5;149mTestBase):
1163
1164 def test_create_cid(self):
1165 cid = interpreters.channel_create()
1166 self.assertIsInstance(cid, interpreters.ChannelID)
1167
1168 def test_sequential_ids(self):
1169 before = interpreters.channel_list_all()
1170 id1 = interpreters.channel_create()
1171 id2 = interpreters.channel_create()
1172 id3 = interpreters.channel_create()
1173 after = interpreters.channel_list_all()
1174
1175 self.assertEqual(id2, int(id1) + 1)
1176 self.assertEqual(id3, int(id2) + 1)
1177 self.assertEqual(set(after) - set(before), {id1, id2, id3})
1178
1179 def test_ids_global(self):
1180 id1 = interpreters.create()
1181 out = _run_output(id1, dedent("""
1182 import _xxsubinterpreters as _interpreters
1183 cid = _interpreters.channel_create()
1184 print(cid)
1185 """))
1186 cid1 = int(out.strip())
1187
1188 id2 = interpreters.create()
1189 out = _run_output(id2, dedent("""
1190 import _xxsubinterpreters as _interpreters
1191 cid = _interpreters.channel_create()
1192 print(cid)
1193 """))
1194 cid2 = int(out.strip())
1195
1196 self.assertEqual(cid2, int(cid1) + 1)
1197
1198 def test_channel_list_interpreters_none(self):
1199 """Test listing interpreters for a channel with no associations."""
1200 # Test for channel with no associated interpreters.
1201 cid = interpreters.channel_create()
1202 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1203 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1204 self.assertEqual(send_interps, [])
1205 self.assertEqual(recv_interps, [])
1206
1207 def test_channel_list_interpreters_basic(self):
1208 """Test basic listing channel interpreters."""
1209 interp0 = interpreters.get_main()
1210 cid = interpreters.channel_create()
1211 interpreters.channel_send(cid, "send")
1212 # Test for a channel that has one end associated to an interpreter.
1213 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1214 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1215 self.assertEqual(send_interps, [interp0])
1216 self.assertEqual(recv_interps, [])
1217
1218 interp1 = interpreters.create()
1219 _run_output(interp1, dedent(f"""
1220 import _xxsubinterpreters as _interpreters
1221 obj = _interpreters.channel_recv({cid})
1222 """))
1223 # Test for channel that has both ends associated to an interpreter.
1224 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1225 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1226 self.assertEqual(send_interps, [interp0])
1227 self.assertEqual(recv_interps, [interp1])
1228
1229 def test_channel_list_interpreters_multiple(self):
1230 """Test listing interpreters for a channel with many associations."""
1231 interp0 = interpreters.get_main()
1232 interp1 = interpreters.create()
1233 interp2 = interpreters.create()
1234 interp3 = interpreters.create()
1235 cid = interpreters.channel_create()
1236
1237 interpreters.channel_send(cid, "send")
1238 _run_output(interp1, dedent(f"""
1239 import _xxsubinterpreters as _interpreters
1240 _interpreters.channel_send({cid}, "send")
1241 """))
1242 _run_output(interp2, dedent(f"""
1243 import _xxsubinterpreters as _interpreters
1244 obj = _interpreters.channel_recv({cid})
1245 """))
1246 _run_output(interp3, dedent(f"""
1247 import _xxsubinterpreters as _interpreters
1248 obj = _interpreters.channel_recv({cid})
1249 """))
1250 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1251 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1252 self.assertEqual(set(send_interps), {interp0, interp1})
1253 self.assertEqual(set(recv_interps), {interp2, interp3})
1254
1255 def test_channel_list_interpreters_destroyed(self):
1256 """Test listing channel interpreters with a destroyed interpreter."""
1257 interp0 = interpreters.get_main()
1258 interp1 = interpreters.create()
1259 cid = interpreters.channel_create()
1260 interpreters.channel_send(cid, "send")
1261 _run_output(interp1, dedent(f"""
1262 import _xxsubinterpreters as _interpreters
1263 obj = _interpreters.channel_recv({cid})
1264 """))
1265 # Should be one interpreter associated with each end.
1266 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1267 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1268 self.assertEqual(send_interps, [interp0])
1269 self.assertEqual(recv_interps, [interp1])
1270
1271 interpreters.destroy(interp1)
1272 # Destroyed interpreter should not be listed.
1273 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1274 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1275 self.assertEqual(send_interps, [interp0])
1276 self.assertEqual(recv_interps, [])
1277
1278 def test_channel_list_interpreters_released(self):
1279 """Test listing channel interpreters with a released channel."""
1280 # Set up one channel with main interpreter on the send end and two
1281 # subinterpreters on the receive end.
1282 interp0 = interpreters.get_main()
1283 interp1 = interpreters.create()
1284 interp2 = interpreters.create()
1285 cid = interpreters.channel_create()
1286 interpreters.channel_send(cid, "data")
1287 _run_output(interp1, dedent(f"""
1288 import _xxsubinterpreters as _interpreters
1289 obj = _interpreters.channel_recv({cid})
1290 """))
1291 interpreters.channel_send(cid, "data")
1292 _run_output(interp2, dedent(f"""
1293 import _xxsubinterpreters as _interpreters
1294 obj = _interpreters.channel_recv({cid})
1295 """))
1296 # Check the setup.
1297 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1298 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1299 self.assertEqual(len(send_interps), 1)
1300 self.assertEqual(len(recv_interps), 2)
1301
1302 # Release the main interpreter from the send end.
1303 interpreters.channel_release(cid, send=True)
1304 # Send end should have no associated interpreters.
1305 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1306 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1307 self.assertEqual(len(send_interps), 0)
1308 self.assertEqual(len(recv_interps), 2)
1309
1310 # Release one of the subinterpreters from the receive end.
1311 _run_output(interp2, dedent(f"""
1312 import _xxsubinterpreters as _interpreters
1313 _interpreters.channel_release({cid})
1314 """))
1315 # Receive end should have the released interpreter removed.
1316 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1317 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1318 self.assertEqual(len(send_interps), 0)
1319 self.assertEqual(recv_interps, [interp1])
1320
1321 def test_channel_list_interpreters_closed(self):
1322 """Test listing channel interpreters with a closed channel."""
1323 interp0 = interpreters.get_main()
1324 interp1 = interpreters.create()
1325 cid = interpreters.channel_create()
1326 # Put something in the channel so that it's not empty.
1327 interpreters.channel_send(cid, "send")
1328
1329 # Check initial state.
1330 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1331 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1332 self.assertEqual(len(send_interps), 1)
1333 self.assertEqual(len(recv_interps), 0)
1334
1335 # Force close the channel.
1336 interpreters.channel_close(cid, force=True)
1337 # Both ends should raise an error.
1338 with self.assertRaises(interpreters.ChannelClosedError):
1339 interpreters.channel_list_interpreters(cid, send=True)
1340 with self.assertRaises(interpreters.ChannelClosedError):
1341 interpreters.channel_list_interpreters(cid, send=False)
1342
1343 def test_channel_list_interpreters_closed_send_end(self):
1344 """Test listing channel interpreters with a channel's send end closed."""
1345 interp0 = interpreters.get_main()
1346 interp1 = interpreters.create()
1347 cid = interpreters.channel_create()
1348 # Put something in the channel so that it's not empty.
1349 interpreters.channel_send(cid, "send")
1350
1351 # Check initial state.
1352 send_interps = interpreters.channel_list_interpreters(cid, send=True)
1353 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1354 self.assertEqual(len(send_interps), 1)
1355 self.assertEqual(len(recv_interps), 0)
1356
1357 # Close the send end of the channel.
1358 interpreters.channel_close(cid, send=True)
1359 # Send end should raise an error.
1360 with self.assertRaises(interpreters.ChannelClosedError):
1361 interpreters.channel_list_interpreters(cid, send=True)
1362 # Receive end should not be closed (since channel is not empty).
1363 recv_interps = interpreters.channel_list_interpreters(cid, send=False)
1364 self.assertEqual(len(recv_interps), 0)
1365
1366 # Close the receive end of the channel from a subinterpreter.
1367 _run_output(interp1, dedent(f"""
1368 import _xxsubinterpreters as _interpreters
1369 _interpreters.channel_close({cid}, force=True)
1370 """))
1371 # Both ends should raise an error.
1372 with self.assertRaises(interpreters.ChannelClosedError):
1373 interpreters.channel_list_interpreters(cid, send=True)
1374 with self.assertRaises(interpreters.ChannelClosedError):
1375 interpreters.channel_list_interpreters(cid, send=False)
1376
1377 ####################
1378
1379 def test_send_recv_main(self):
1380 cid = interpreters.channel_create()
1381 orig = b'spam'
1382 interpreters.channel_send(cid, orig)
1383 obj = interpreters.channel_recv(cid)
1384
1385 self.assertEqual(obj, orig)
1386 self.assertIsNot(obj, orig)
1387
1388 def test_send_recv_same_interpreter(self):
1389 id1 = interpreters.create()
1390 out = _run_output(id1, dedent("""
1391 import _xxsubinterpreters as _interpreters
1392 cid = _interpreters.channel_create()
1393 orig = b'spam'
1394 _interpreters.channel_send(cid, orig)
1395 obj = _interpreters.channel_recv(cid)
1396 assert obj is not orig
1397 assert obj == orig
1398 """))
1399
1400 def test_send_recv_different_interpreters(self):
1401 cid = interpreters.channel_create()
1402 id1 = interpreters.create()
1403 out = _run_output(id1, dedent(f"""
1404 import _xxsubinterpreters as _interpreters
1405 _interpreters.channel_send({cid}, b'spam')
1406 """))
1407 obj = interpreters.channel_recv(cid)
1408
1409 self.assertEqual(obj, b'spam')
1410
1411 def test_send_recv_different_threads(self):
1412 cid = interpreters.channel_create()
1413
1414 def f():
1415 while True:
1416 try:
1417 obj = interpreters.channel_recv(cid)
1418 break
1419 except interpreters.ChannelEmptyError:
1420 time.sleep(0.1)
1421 interpreters.channel_send(cid, obj)
1422 t = threading.Thread(target=f)
1423 t.start()
1424
1425 interpreters.channel_send(cid, b'spam')
1426 t.join()
1427 obj = interpreters.channel_recv(cid)
1428
1429 self.assertEqual(obj, b'spam')
1430
1431 def test_send_recv_different_interpreters_and_threads(self):
1432 cid = interpreters.channel_create()
1433 id1 = interpreters.create()
1434 out = None
1435
1436 def f():
1437 nonlocal out
1438 out = _run_output(id1, dedent(f"""
1439 import time
1440 import _xxsubinterpreters as _interpreters
1441 while True:
1442 try:
1443 obj = _interpreters.channel_recv({cid})
1444 break
1445 except _interpreters.ChannelEmptyError:
1446 time.sleep(0.1)
1447 assert(obj == b'spam')
1448 _interpreters.channel_send({cid}, b'eggs')
1449 """))
1450 t = threading.Thread(target=f)
1451 t.start()
1452
1453 interpreters.channel_send(cid, b'spam')
1454 t.join()
1455 obj = interpreters.channel_recv(cid)
1456
1457 self.assertEqual(obj, b'eggs')
1458
1459 def test_send_not_found(self):
1460 with self.assertRaises(interpreters.ChannelNotFoundError):
1461 interpreters.channel_send(10, b'spam')
1462
1463 def test_recv_not_found(self):
1464 with self.assertRaises(interpreters.ChannelNotFoundError):
1465 interpreters.channel_recv(10)
1466
1467 def test_recv_empty(self):
1468 cid = interpreters.channel_create()
1469 with self.assertRaises(interpreters.ChannelEmptyError):
1470 interpreters.channel_recv(cid)
1471
1472 def test_recv_default(self):
1473 default = object()
1474 cid = interpreters.channel_create()
1475 obj1 = interpreters.channel_recv(cid, default)
1476 interpreters.channel_send(cid, None)
1477 interpreters.channel_send(cid, 1)
1478 interpreters.channel_send(cid, b'spam')
1479 interpreters.channel_send(cid, b'eggs')
1480 obj2 = interpreters.channel_recv(cid, default)
1481 obj3 = interpreters.channel_recv(cid, default)
1482 obj4 = interpreters.channel_recv(cid)
1483 obj5 = interpreters.channel_recv(cid, default)
1484 obj6 = interpreters.channel_recv(cid, default)
1485
1486 self.assertIs(obj1, default)
1487 self.assertIs(obj2, None)
1488 self.assertEqual(obj3, 1)
1489 self.assertEqual(obj4, b'spam')
1490 self.assertEqual(obj5, b'eggs')
1491 self.assertIs(obj6, default)
1492
1493 def test_run_string_arg_unresolved(self):
1494 cid = interpreters.channel_create()
1495 interp = interpreters.create()
1496
1497 out = _run_output(interp, dedent("""
1498 import _xxsubinterpreters as _interpreters
1499 print(cid.end)
1500 _interpreters.channel_send(cid, b'spam')
1501 """),
1502 dict(cid=cid.send))
1503 obj = interpreters.channel_recv(cid)
1504
1505 self.assertEqual(obj, b'spam')
1506 self.assertEqual(out.strip(), 'send')
1507
1508 # XXX For now there is no high-level channel into which the
1509 # sent channel ID can be converted...
1510 # Note: this test caused crashes on some buildbots (bpo-33615).
1511 @unittest.skip('disabled until high-level channels exist')
1512 def test_run_string_arg_resolved(self):
1513 cid = interpreters.channel_create()
1514 cid = interpreters._channel_id(cid, _resolve=True)
1515 interp = interpreters.create()
1516
1517 out = _run_output(interp, dedent("""
1518 import _xxsubinterpreters as _interpreters
1519 print(chan.id.end)
1520 _interpreters.channel_send(chan.id, b'spam')
1521 """),
1522 dict(chan=cid.send))
1523 obj = interpreters.channel_recv(cid)
1524
1525 self.assertEqual(obj, b'spam')
1526 self.assertEqual(out.strip(), 'send')
1527
1528 # close
1529
1530 def test_close_single_user(self):
1531 cid = interpreters.channel_create()
1532 interpreters.channel_send(cid, b'spam')
1533 interpreters.channel_recv(cid)
1534 interpreters.channel_close(cid)
1535
1536 with self.assertRaises(interpreters.ChannelClosedError):
1537 interpreters.channel_send(cid, b'eggs')
1538 with self.assertRaises(interpreters.ChannelClosedError):
1539 interpreters.channel_recv(cid)
1540
1541 def test_close_multiple_users(self):
1542 cid = interpreters.channel_create()
1543 id1 = interpreters.create()
1544 id2 = interpreters.create()
1545 interpreters.run_string(id1, dedent(f"""
1546 import _xxsubinterpreters as _interpreters
1547 _interpreters.channel_send({cid}, b'spam')
1548 """))
1549 interpreters.run_string(id2, dedent(f"""
1550 import _xxsubinterpreters as _interpreters
1551 _interpreters.channel_recv({cid})
1552 """))
1553 interpreters.channel_close(cid)
1554 with self.assertRaises(interpreters.RunFailedError) as cm:
1555 interpreters.run_string(id1, dedent(f"""
1556 _interpreters.channel_send({cid}, b'spam')
1557 """))
1558 self.assertIn('ChannelClosedError', str(cm.exception))
1559 with self.assertRaises(interpreters.RunFailedError) as cm:
1560 interpreters.run_string(id2, dedent(f"""
1561 _interpreters.channel_send({cid}, b'spam')
1562 """))
1563 self.assertIn('ChannelClosedError', str(cm.exception))
1564
1565 def test_close_multiple_times(self):
1566 cid = interpreters.channel_create()
1567 interpreters.channel_send(cid, b'spam')
1568 interpreters.channel_recv(cid)
1569 interpreters.channel_close(cid)
1570
1571 with self.assertRaises(interpreters.ChannelClosedError):
1572 interpreters.channel_close(cid)
1573
1574 def test_close_empty(self):
1575 tests = [
1576 (False, False),
1577 (True, False),
1578 (False, True),
1579 (True, True),
1580 ]
1581 for send, recv in tests:
1582 with self.subTest((send, recv)):
1583 cid = interpreters.channel_create()
1584 interpreters.channel_send(cid, b'spam')
1585 interpreters.channel_recv(cid)
1586 interpreters.channel_close(cid, send=send, recv=recv)
1587
1588 with self.assertRaises(interpreters.ChannelClosedError):
1589 interpreters.channel_send(cid, b'eggs')
1590 with self.assertRaises(interpreters.ChannelClosedError):
1591 interpreters.channel_recv(cid)
1592
1593 def test_close_defaults_with_unused_items(self):
1594 cid = interpreters.channel_create()
1595 interpreters.channel_send(cid, b'spam')
1596 interpreters.channel_send(cid, b'ham')
1597
1598 with self.assertRaises(interpreters.ChannelNotEmptyError):
1599 interpreters.channel_close(cid)
1600 interpreters.channel_recv(cid)
1601 interpreters.channel_send(cid, b'eggs')
1602
1603 def test_close_recv_with_unused_items_unforced(self):
1604 cid = interpreters.channel_create()
1605 interpreters.channel_send(cid, b'spam')
1606 interpreters.channel_send(cid, b'ham')
1607
1608 with self.assertRaises(interpreters.ChannelNotEmptyError):
1609 interpreters.channel_close(cid, recv=True)
1610 interpreters.channel_recv(cid)
1611 interpreters.channel_send(cid, b'eggs')
1612 interpreters.channel_recv(cid)
1613 interpreters.channel_recv(cid)
1614 interpreters.channel_close(cid, recv=True)
1615
1616 def test_close_send_with_unused_items_unforced(self):
1617 cid = interpreters.channel_create()
1618 interpreters.channel_send(cid, b'spam')
1619 interpreters.channel_send(cid, b'ham')
1620 interpreters.channel_close(cid, send=True)
1621
1622 with self.assertRaises(interpreters.ChannelClosedError):
1623 interpreters.channel_send(cid, b'eggs')
1624 interpreters.channel_recv(cid)
1625 interpreters.channel_recv(cid)
1626 with self.assertRaises(interpreters.ChannelClosedError):
1627 interpreters.channel_recv(cid)
1628
1629 def test_close_both_with_unused_items_unforced(self):
1630 cid = interpreters.channel_create()
1631 interpreters.channel_send(cid, b'spam')
1632 interpreters.channel_send(cid, b'ham')
1633
1634 with self.assertRaises(interpreters.ChannelNotEmptyError):
1635 interpreters.channel_close(cid, recv=True, send=True)
1636 interpreters.channel_recv(cid)
1637 interpreters.channel_send(cid, b'eggs')
1638 interpreters.channel_recv(cid)
1639 interpreters.channel_recv(cid)
1640 interpreters.channel_close(cid, recv=True)
1641
1642 def test_close_recv_with_unused_items_forced(self):
1643 cid = interpreters.channel_create()
1644 interpreters.channel_send(cid, b'spam')
1645 interpreters.channel_send(cid, b'ham')
1646 interpreters.channel_close(cid, recv=True, force=True)
1647
1648 with self.assertRaises(interpreters.ChannelClosedError):
1649 interpreters.channel_send(cid, b'eggs')
1650 with self.assertRaises(interpreters.ChannelClosedError):
1651 interpreters.channel_recv(cid)
1652
1653 def test_close_send_with_unused_items_forced(self):
1654 cid = interpreters.channel_create()
1655 interpreters.channel_send(cid, b'spam')
1656 interpreters.channel_send(cid, b'ham')
1657 interpreters.channel_close(cid, send=True, force=True)
1658
1659 with self.assertRaises(interpreters.ChannelClosedError):
1660 interpreters.channel_send(cid, b'eggs')
1661 with self.assertRaises(interpreters.ChannelClosedError):
1662 interpreters.channel_recv(cid)
1663
1664 def test_close_both_with_unused_items_forced(self):
1665 cid = interpreters.channel_create()
1666 interpreters.channel_send(cid, b'spam')
1667 interpreters.channel_send(cid, b'ham')
1668 interpreters.channel_close(cid, send=True, recv=True, force=True)
1669
1670 with self.assertRaises(interpreters.ChannelClosedError):
1671 interpreters.channel_send(cid, b'eggs')
1672 with self.assertRaises(interpreters.ChannelClosedError):
1673 interpreters.channel_recv(cid)
1674
1675 def test_close_never_used(self):
1676 cid = interpreters.channel_create()
1677 interpreters.channel_close(cid)
1678
1679 with self.assertRaises(interpreters.ChannelClosedError):
1680 interpreters.channel_send(cid, b'spam')
1681 with self.assertRaises(interpreters.ChannelClosedError):
1682 interpreters.channel_recv(cid)
1683
1684 def test_close_by_unassociated_interp(self):
1685 cid = interpreters.channel_create()
1686 interpreters.channel_send(cid, b'spam')
1687 interp = interpreters.create()
1688 interpreters.run_string(interp, dedent(f"""
1689 import _xxsubinterpreters as _interpreters
1690 _interpreters.channel_close({cid}, force=True)
1691 """))
1692 with self.assertRaises(interpreters.ChannelClosedError):
1693 interpreters.channel_recv(cid)
1694 with self.assertRaises(interpreters.ChannelClosedError):
1695 interpreters.channel_close(cid)
1696
1697 def test_close_used_multiple_times_by_single_user(self):
1698 cid = interpreters.channel_create()
1699 interpreters.channel_send(cid, b'spam')
1700 interpreters.channel_send(cid, b'spam')
1701 interpreters.channel_send(cid, b'spam')
1702 interpreters.channel_recv(cid)
1703 interpreters.channel_close(cid, force=True)
1704
1705 with self.assertRaises(interpreters.ChannelClosedError):
1706 interpreters.channel_send(cid, b'eggs')
1707 with self.assertRaises(interpreters.ChannelClosedError):
1708 interpreters.channel_recv(cid)
1709
1710 def test_channel_list_interpreters_invalid_channel(self):
1711 cid = interpreters.channel_create()
1712 # Test for invalid channel ID.
1713 with self.assertRaises(interpreters.ChannelNotFoundError):
1714 interpreters.channel_list_interpreters(1000, send=True)
1715
1716 interpreters.channel_close(cid)
1717 # Test for a channel that has been closed.
1718 with self.assertRaises(interpreters.ChannelClosedError):
1719 interpreters.channel_list_interpreters(cid, send=True)
1720
1721 def test_channel_list_interpreters_invalid_args(self):
1722 # Tests for invalid arguments passed to the API.
1723 cid = interpreters.channel_create()
1724 with self.assertRaises(TypeError):
1725 interpreters.channel_list_interpreters(cid)
1726
1727
1728 class ESC[4;38;5;81mChannelReleaseTests(ESC[4;38;5;149mTestBase):
1729
1730 # XXX Add more test coverage a la the tests for close().
1731
1732 """
1733 - main / interp / other
1734 - run in: current thread / new thread / other thread / different threads
1735 - end / opposite
1736 - force / no force
1737 - used / not used (associated / not associated)
1738 - empty / emptied / never emptied / partly emptied
1739 - closed / not closed
1740 - released / not released
1741 - creator (interp) / other
1742 - associated interpreter not running
1743 - associated interpreter destroyed
1744 """
1745
1746 """
1747 use
1748 pre-release
1749 release
1750 after
1751 check
1752 """
1753
1754 """
1755 release in: main, interp1
1756 creator: same, other (incl. interp2)
1757
1758 use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
1759 pre-release: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
1760 pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
1761
1762 release: same
1763 release forced: same
1764
1765 use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
1766 release after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
1767 check released: send/recv for same/other(incl. interp2)
1768 check closed: send/recv for same/other(incl. interp2)
1769 """
1770
1771 def test_single_user(self):
1772 cid = interpreters.channel_create()
1773 interpreters.channel_send(cid, b'spam')
1774 interpreters.channel_recv(cid)
1775 interpreters.channel_release(cid, send=True, recv=True)
1776
1777 with self.assertRaises(interpreters.ChannelClosedError):
1778 interpreters.channel_send(cid, b'eggs')
1779 with self.assertRaises(interpreters.ChannelClosedError):
1780 interpreters.channel_recv(cid)
1781
1782 def test_multiple_users(self):
1783 cid = interpreters.channel_create()
1784 id1 = interpreters.create()
1785 id2 = interpreters.create()
1786 interpreters.run_string(id1, dedent(f"""
1787 import _xxsubinterpreters as _interpreters
1788 _interpreters.channel_send({cid}, b'spam')
1789 """))
1790 out = _run_output(id2, dedent(f"""
1791 import _xxsubinterpreters as _interpreters
1792 obj = _interpreters.channel_recv({cid})
1793 _interpreters.channel_release({cid})
1794 print(repr(obj))
1795 """))
1796 interpreters.run_string(id1, dedent(f"""
1797 _interpreters.channel_release({cid})
1798 """))
1799
1800 self.assertEqual(out.strip(), "b'spam'")
1801
1802 def test_no_kwargs(self):
1803 cid = interpreters.channel_create()
1804 interpreters.channel_send(cid, b'spam')
1805 interpreters.channel_recv(cid)
1806 interpreters.channel_release(cid)
1807
1808 with self.assertRaises(interpreters.ChannelClosedError):
1809 interpreters.channel_send(cid, b'eggs')
1810 with self.assertRaises(interpreters.ChannelClosedError):
1811 interpreters.channel_recv(cid)
1812
1813 def test_multiple_times(self):
1814 cid = interpreters.channel_create()
1815 interpreters.channel_send(cid, b'spam')
1816 interpreters.channel_recv(cid)
1817 interpreters.channel_release(cid, send=True, recv=True)
1818
1819 with self.assertRaises(interpreters.ChannelClosedError):
1820 interpreters.channel_release(cid, send=True, recv=True)
1821
1822 def test_with_unused_items(self):
1823 cid = interpreters.channel_create()
1824 interpreters.channel_send(cid, b'spam')
1825 interpreters.channel_send(cid, b'ham')
1826 interpreters.channel_release(cid, send=True, recv=True)
1827
1828 with self.assertRaises(interpreters.ChannelClosedError):
1829 interpreters.channel_recv(cid)
1830
1831 def test_never_used(self):
1832 cid = interpreters.channel_create()
1833 interpreters.channel_release(cid)
1834
1835 with self.assertRaises(interpreters.ChannelClosedError):
1836 interpreters.channel_send(cid, b'spam')
1837 with self.assertRaises(interpreters.ChannelClosedError):
1838 interpreters.channel_recv(cid)
1839
1840 def test_by_unassociated_interp(self):
1841 cid = interpreters.channel_create()
1842 interpreters.channel_send(cid, b'spam')
1843 interp = interpreters.create()
1844 interpreters.run_string(interp, dedent(f"""
1845 import _xxsubinterpreters as _interpreters
1846 _interpreters.channel_release({cid})
1847 """))
1848 obj = interpreters.channel_recv(cid)
1849 interpreters.channel_release(cid)
1850
1851 with self.assertRaises(interpreters.ChannelClosedError):
1852 interpreters.channel_send(cid, b'eggs')
1853 self.assertEqual(obj, b'spam')
1854
1855 def test_close_if_unassociated(self):
1856 # XXX Something's not right with this test...
1857 cid = interpreters.channel_create()
1858 interp = interpreters.create()
1859 interpreters.run_string(interp, dedent(f"""
1860 import _xxsubinterpreters as _interpreters
1861 obj = _interpreters.channel_send({cid}, b'spam')
1862 _interpreters.channel_release({cid})
1863 """))
1864
1865 with self.assertRaises(interpreters.ChannelClosedError):
1866 interpreters.channel_recv(cid)
1867
1868 def test_partially(self):
1869 # XXX Is partial close too weird/confusing?
1870 cid = interpreters.channel_create()
1871 interpreters.channel_send(cid, None)
1872 interpreters.channel_recv(cid)
1873 interpreters.channel_send(cid, b'spam')
1874 interpreters.channel_release(cid, send=True)
1875 obj = interpreters.channel_recv(cid)
1876
1877 self.assertEqual(obj, b'spam')
1878
1879 def test_used_multiple_times_by_single_user(self):
1880 cid = interpreters.channel_create()
1881 interpreters.channel_send(cid, b'spam')
1882 interpreters.channel_send(cid, b'spam')
1883 interpreters.channel_send(cid, b'spam')
1884 interpreters.channel_recv(cid)
1885 interpreters.channel_release(cid, send=True, recv=True)
1886
1887 with self.assertRaises(interpreters.ChannelClosedError):
1888 interpreters.channel_send(cid, b'eggs')
1889 with self.assertRaises(interpreters.ChannelClosedError):
1890 interpreters.channel_recv(cid)
1891
1892
1893 class ESC[4;38;5;81mChannelCloseFixture(ESC[4;38;5;149mnamedtuple('ChannelCloseFixture',
1894 'end interp other extra creator')):
1895
1896 # Set this to True to avoid creating interpreters, e.g. when
1897 # scanning through test permutations without running them.
1898 QUICK = False
1899
1900 def __new__(cls, end, interp, other, extra, creator):
1901 assert end in ('send', 'recv')
1902 if cls.QUICK:
1903 known = {}
1904 else:
1905 interp = Interpreter.from_raw(interp)
1906 other = Interpreter.from_raw(other)
1907 extra = Interpreter.from_raw(extra)
1908 known = {
1909 interp.name: interp,
1910 other.name: other,
1911 extra.name: extra,
1912 }
1913 if not creator:
1914 creator = 'same'
1915 self = super().__new__(cls, end, interp, other, extra, creator)
1916 self._prepped = set()
1917 self._state = ChannelState()
1918 self._known = known
1919 return self
1920
1921 @property
1922 def state(self):
1923 return self._state
1924
1925 @property
1926 def cid(self):
1927 try:
1928 return self._cid
1929 except AttributeError:
1930 creator = self._get_interpreter(self.creator)
1931 self._cid = self._new_channel(creator)
1932 return self._cid
1933
1934 def get_interpreter(self, interp):
1935 interp = self._get_interpreter(interp)
1936 self._prep_interpreter(interp)
1937 return interp
1938
1939 def expect_closed_error(self, end=None):
1940 if end is None:
1941 end = self.end
1942 if end == 'recv' and self.state.closed == 'send':
1943 return False
1944 return bool(self.state.closed)
1945
1946 def prep_interpreter(self, interp):
1947 self._prep_interpreter(interp)
1948
1949 def record_action(self, action, result):
1950 self._state = result
1951
1952 def clean_up(self):
1953 clean_up_interpreters()
1954 clean_up_channels()
1955
1956 # internal methods
1957
1958 def _new_channel(self, creator):
1959 if creator.name == 'main':
1960 return interpreters.channel_create()
1961 else:
1962 ch = interpreters.channel_create()
1963 run_interp(creator.id, f"""
1964 import _xxsubinterpreters
1965 cid = _xxsubinterpreters.channel_create()
1966 # We purposefully send back an int to avoid tying the
1967 # channel to the other interpreter.
1968 _xxsubinterpreters.channel_send({ch}, int(cid))
1969 del _xxsubinterpreters
1970 """)
1971 self._cid = interpreters.channel_recv(ch)
1972 return self._cid
1973
1974 def _get_interpreter(self, interp):
1975 if interp in ('same', 'interp'):
1976 return self.interp
1977 elif interp == 'other':
1978 return self.other
1979 elif interp == 'extra':
1980 return self.extra
1981 else:
1982 name = interp
1983 try:
1984 interp = self._known[name]
1985 except KeyError:
1986 interp = self._known[name] = Interpreter(name)
1987 return interp
1988
1989 def _prep_interpreter(self, interp):
1990 if interp.id in self._prepped:
1991 return
1992 self._prepped.add(interp.id)
1993 if interp.name == 'main':
1994 return
1995 run_interp(interp.id, f"""
1996 import _xxsubinterpreters as interpreters
1997 import test.test__xxsubinterpreters as helpers
1998 ChannelState = helpers.ChannelState
1999 try:
2000 cid
2001 except NameError:
2002 cid = interpreters._channel_id({self.cid})
2003 """)
2004
2005
2006 @unittest.skip('these tests take several hours to run')
2007 class ESC[4;38;5;81mExhaustiveChannelTests(ESC[4;38;5;149mTestBase):
2008
2009 """
2010 - main / interp / other
2011 - run in: current thread / new thread / other thread / different threads
2012 - end / opposite
2013 - force / no force
2014 - used / not used (associated / not associated)
2015 - empty / emptied / never emptied / partly emptied
2016 - closed / not closed
2017 - released / not released
2018 - creator (interp) / other
2019 - associated interpreter not running
2020 - associated interpreter destroyed
2021
2022 - close after unbound
2023 """
2024
2025 """
2026 use
2027 pre-close
2028 close
2029 after
2030 check
2031 """
2032
2033 """
2034 close in: main, interp1
2035 creator: same, other, extra
2036
2037 use: None,send,recv,send/recv in None,same,other,same+other,all
2038 pre-close: None,send,recv in None,same,other,same+other,all
2039 pre-close forced: None,send,recv in None,same,other,same+other,all
2040
2041 close: same
2042 close forced: same
2043
2044 use after: None,send,recv,send/recv in None,same,other,extra,same+other,all
2045 close after: None,send,recv,send/recv in None,same,other,extra,same+other,all
2046 check closed: send/recv for same/other(incl. interp2)
2047 """
2048
2049 def iter_action_sets(self):
2050 # - used / not used (associated / not associated)
2051 # - empty / emptied / never emptied / partly emptied
2052 # - closed / not closed
2053 # - released / not released
2054
2055 # never used
2056 yield []
2057
2058 # only pre-closed (and possible used after)
2059 for closeactions in self._iter_close_action_sets('same', 'other'):
2060 yield closeactions
2061 for postactions in self._iter_post_close_action_sets():
2062 yield closeactions + postactions
2063 for closeactions in self._iter_close_action_sets('other', 'extra'):
2064 yield closeactions
2065 for postactions in self._iter_post_close_action_sets():
2066 yield closeactions + postactions
2067
2068 # used
2069 for useactions in self._iter_use_action_sets('same', 'other'):
2070 yield useactions
2071 for closeactions in self._iter_close_action_sets('same', 'other'):
2072 actions = useactions + closeactions
2073 yield actions
2074 for postactions in self._iter_post_close_action_sets():
2075 yield actions + postactions
2076 for closeactions in self._iter_close_action_sets('other', 'extra'):
2077 actions = useactions + closeactions
2078 yield actions
2079 for postactions in self._iter_post_close_action_sets():
2080 yield actions + postactions
2081 for useactions in self._iter_use_action_sets('other', 'extra'):
2082 yield useactions
2083 for closeactions in self._iter_close_action_sets('same', 'other'):
2084 actions = useactions + closeactions
2085 yield actions
2086 for postactions in self._iter_post_close_action_sets():
2087 yield actions + postactions
2088 for closeactions in self._iter_close_action_sets('other', 'extra'):
2089 actions = useactions + closeactions
2090 yield actions
2091 for postactions in self._iter_post_close_action_sets():
2092 yield actions + postactions
2093
2094 def _iter_use_action_sets(self, interp1, interp2):
2095 interps = (interp1, interp2)
2096
2097 # only recv end used
2098 yield [
2099 ChannelAction('use', 'recv', interp1),
2100 ]
2101 yield [
2102 ChannelAction('use', 'recv', interp2),
2103 ]
2104 yield [
2105 ChannelAction('use', 'recv', interp1),
2106 ChannelAction('use', 'recv', interp2),
2107 ]
2108
2109 # never emptied
2110 yield [
2111 ChannelAction('use', 'send', interp1),
2112 ]
2113 yield [
2114 ChannelAction('use', 'send', interp2),
2115 ]
2116 yield [
2117 ChannelAction('use', 'send', interp1),
2118 ChannelAction('use', 'send', interp2),
2119 ]
2120
2121 # partially emptied
2122 for interp1 in interps:
2123 for interp2 in interps:
2124 for interp3 in interps:
2125 yield [
2126 ChannelAction('use', 'send', interp1),
2127 ChannelAction('use', 'send', interp2),
2128 ChannelAction('use', 'recv', interp3),
2129 ]
2130
2131 # fully emptied
2132 for interp1 in interps:
2133 for interp2 in interps:
2134 for interp3 in interps:
2135 for interp4 in interps:
2136 yield [
2137 ChannelAction('use', 'send', interp1),
2138 ChannelAction('use', 'send', interp2),
2139 ChannelAction('use', 'recv', interp3),
2140 ChannelAction('use', 'recv', interp4),
2141 ]
2142
2143 def _iter_close_action_sets(self, interp1, interp2):
2144 ends = ('recv', 'send')
2145 interps = (interp1, interp2)
2146 for force in (True, False):
2147 op = 'force-close' if force else 'close'
2148 for interp in interps:
2149 for end in ends:
2150 yield [
2151 ChannelAction(op, end, interp),
2152 ]
2153 for recvop in ('close', 'force-close'):
2154 for sendop in ('close', 'force-close'):
2155 for recv in interps:
2156 for send in interps:
2157 yield [
2158 ChannelAction(recvop, 'recv', recv),
2159 ChannelAction(sendop, 'send', send),
2160 ]
2161
2162 def _iter_post_close_action_sets(self):
2163 for interp in ('same', 'extra', 'other'):
2164 yield [
2165 ChannelAction('use', 'recv', interp),
2166 ]
2167 yield [
2168 ChannelAction('use', 'send', interp),
2169 ]
2170
2171 def run_actions(self, fix, actions):
2172 for action in actions:
2173 self.run_action(fix, action)
2174
2175 def run_action(self, fix, action, *, hideclosed=True):
2176 end = action.resolve_end(fix.end)
2177 interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
2178 fix.prep_interpreter(interp)
2179 if interp.name == 'main':
2180 result = run_action(
2181 fix.cid,
2182 action.action,
2183 end,
2184 fix.state,
2185 hideclosed=hideclosed,
2186 )
2187 fix.record_action(action, result)
2188 else:
2189 _cid = interpreters.channel_create()
2190 run_interp(interp.id, f"""
2191 result = helpers.run_action(
2192 {fix.cid},
2193 {repr(action.action)},
2194 {repr(end)},
2195 {repr(fix.state)},
2196 hideclosed={hideclosed},
2197 )
2198 interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little'))
2199 interpreters.channel_send({_cid}, b'X' if result.closed else b'')
2200 """)
2201 result = ChannelState(
2202 pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'),
2203 closed=bool(interpreters.channel_recv(_cid)),
2204 )
2205 fix.record_action(action, result)
2206
2207 def iter_fixtures(self):
2208 # XXX threads?
2209 interpreters = [
2210 ('main', 'interp', 'extra'),
2211 ('interp', 'main', 'extra'),
2212 ('interp1', 'interp2', 'extra'),
2213 ('interp1', 'interp2', 'main'),
2214 ]
2215 for interp, other, extra in interpreters:
2216 for creator in ('same', 'other', 'creator'):
2217 for end in ('send', 'recv'):
2218 yield ChannelCloseFixture(end, interp, other, extra, creator)
2219
2220 def _close(self, fix, *, force):
2221 op = 'force-close' if force else 'close'
2222 close = ChannelAction(op, fix.end, 'same')
2223 if not fix.expect_closed_error():
2224 self.run_action(fix, close, hideclosed=False)
2225 else:
2226 with self.assertRaises(interpreters.ChannelClosedError):
2227 self.run_action(fix, close, hideclosed=False)
2228
2229 def _assert_closed_in_interp(self, fix, interp=None):
2230 if interp is None or interp.name == 'main':
2231 with self.assertRaises(interpreters.ChannelClosedError):
2232 interpreters.channel_recv(fix.cid)
2233 with self.assertRaises(interpreters.ChannelClosedError):
2234 interpreters.channel_send(fix.cid, b'spam')
2235 with self.assertRaises(interpreters.ChannelClosedError):
2236 interpreters.channel_close(fix.cid)
2237 with self.assertRaises(interpreters.ChannelClosedError):
2238 interpreters.channel_close(fix.cid, force=True)
2239 else:
2240 run_interp(interp.id, f"""
2241 with helpers.expect_channel_closed():
2242 interpreters.channel_recv(cid)
2243 """)
2244 run_interp(interp.id, f"""
2245 with helpers.expect_channel_closed():
2246 interpreters.channel_send(cid, b'spam')
2247 """)
2248 run_interp(interp.id, f"""
2249 with helpers.expect_channel_closed():
2250 interpreters.channel_close(cid)
2251 """)
2252 run_interp(interp.id, f"""
2253 with helpers.expect_channel_closed():
2254 interpreters.channel_close(cid, force=True)
2255 """)
2256
2257 def _assert_closed(self, fix):
2258 self.assertTrue(fix.state.closed)
2259
2260 for _ in range(fix.state.pending):
2261 interpreters.channel_recv(fix.cid)
2262 self._assert_closed_in_interp(fix)
2263
2264 for interp in ('same', 'other'):
2265 interp = fix.get_interpreter(interp)
2266 if interp.name == 'main':
2267 continue
2268 self._assert_closed_in_interp(fix, interp)
2269
2270 interp = fix.get_interpreter('fresh')
2271 self._assert_closed_in_interp(fix, interp)
2272
2273 def _iter_close_tests(self, verbose=False):
2274 i = 0
2275 for actions in self.iter_action_sets():
2276 print()
2277 for fix in self.iter_fixtures():
2278 i += 1
2279 if i > 1000:
2280 return
2281 if verbose:
2282 if (i - 1) % 6 == 0:
2283 print()
2284 print(i, fix, '({} actions)'.format(len(actions)))
2285 else:
2286 if (i - 1) % 6 == 0:
2287 print(' ', end='')
2288 print('.', end=''); sys.stdout.flush()
2289 yield i, fix, actions
2290 if verbose:
2291 print('---')
2292 print()
2293
2294 # This is useful for scanning through the possible tests.
2295 def _skim_close_tests(self):
2296 ChannelCloseFixture.QUICK = True
2297 for i, fix, actions in self._iter_close_tests():
2298 pass
2299
2300 def test_close(self):
2301 for i, fix, actions in self._iter_close_tests():
2302 with self.subTest('{} {} {}'.format(i, fix, actions)):
2303 fix.prep_interpreter(fix.interp)
2304 self.run_actions(fix, actions)
2305
2306 self._close(fix, force=False)
2307
2308 self._assert_closed(fix)
2309 # XXX Things slow down if we have too many interpreters.
2310 fix.clean_up()
2311
2312 def test_force_close(self):
2313 for i, fix, actions in self._iter_close_tests():
2314 with self.subTest('{} {} {}'.format(i, fix, actions)):
2315 fix.prep_interpreter(fix.interp)
2316 self.run_actions(fix, actions)
2317
2318 self._close(fix, force=True)
2319
2320 self._assert_closed(fix)
2321 # XXX Things slow down if we have too many interpreters.
2322 fix.clean_up()
2323
2324
2325 if __name__ == '__main__':
2326 unittest.main()