1 import multiprocessing
2 import sys
3 import time
4 import unittest
5 from concurrent import futures
6 from concurrent.futures._base import (
7 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
8 )
9 from concurrent.futures.process import _check_system_limits
10
11 from test import support
12 from test.support import threading_helper
13
14
15 def create_future(state=PENDING, exception=None, result=None):
16 f = Future()
17 f._state = state
18 f._exception = exception
19 f._result = result
20 return f
21
22
23 PENDING_FUTURE = create_future(state=PENDING)
24 RUNNING_FUTURE = create_future(state=RUNNING)
25 CANCELLED_FUTURE = create_future(state=CANCELLED)
26 CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
27 EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
28 SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
29
30
31 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
32 def setUp(self):
33 self._thread_key = threading_helper.threading_setup()
34
35 def tearDown(self):
36 support.reap_children()
37 threading_helper.threading_cleanup(*self._thread_key)
38
39
40 class ESC[4;38;5;81mExecutorMixin:
41 worker_count = 5
42 executor_kwargs = {}
43
44 def setUp(self):
45 super().setUp()
46
47 self.t1 = time.monotonic()
48 if hasattr(self, "ctx"):
49 self.executor = self.executor_type(
50 max_workers=self.worker_count,
51 mp_context=self.get_context(),
52 **self.executor_kwargs)
53 else:
54 self.executor = self.executor_type(
55 max_workers=self.worker_count,
56 **self.executor_kwargs)
57
58 def tearDown(self):
59 self.executor.shutdown(wait=True)
60 self.executor = None
61
62 dt = time.monotonic() - self.t1
63 if support.verbose:
64 print("%.2fs" % dt, end=' ')
65 self.assertLess(dt, 300, "synchronization issue: test lasted too long")
66
67 super().tearDown()
68
69 def get_context(self):
70 return multiprocessing.get_context(self.ctx)
71
72
73 class ESC[4;38;5;81mThreadPoolMixin(ESC[4;38;5;149mExecutorMixin):
74 executor_type = futures.ThreadPoolExecutor
75
76
77 class ESC[4;38;5;81mProcessPoolForkMixin(ESC[4;38;5;149mExecutorMixin):
78 executor_type = futures.ProcessPoolExecutor
79 ctx = "fork"
80
81 def get_context(self):
82 try:
83 _check_system_limits()
84 except NotImplementedError:
85 self.skipTest("ProcessPoolExecutor unavailable on this system")
86 if sys.platform == "win32":
87 self.skipTest("require unix system")
88 return super().get_context()
89
90
91 class ESC[4;38;5;81mProcessPoolSpawnMixin(ESC[4;38;5;149mExecutorMixin):
92 executor_type = futures.ProcessPoolExecutor
93 ctx = "spawn"
94
95 def get_context(self):
96 try:
97 _check_system_limits()
98 except NotImplementedError:
99 self.skipTest("ProcessPoolExecutor unavailable on this system")
100 return super().get_context()
101
102
103 class ESC[4;38;5;81mProcessPoolForkserverMixin(ESC[4;38;5;149mExecutorMixin):
104 executor_type = futures.ProcessPoolExecutor
105 ctx = "forkserver"
106
107 def get_context(self):
108 try:
109 _check_system_limits()
110 except NotImplementedError:
111 self.skipTest("ProcessPoolExecutor unavailable on this system")
112 if sys.platform == "win32":
113 self.skipTest("require unix system")
114 return super().get_context()
115
116
117 def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
118 executor_mixins=(ThreadPoolMixin,
119 ProcessPoolForkMixin,
120 ProcessPoolForkserverMixin,
121 ProcessPoolSpawnMixin)):
122 def strip_mixin(name):
123 if name.endswith(('Mixin', 'Tests')):
124 return name[:-5]
125 elif name.endswith('Test'):
126 return name[:-4]
127 else:
128 return name
129
130 module = remote_globals['__name__']
131 for exe in executor_mixins:
132 name = ("%s%sTest"
133 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
134 cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module})
135 remote_globals[name] = cls
136
137
138 def setup_module():
139 unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
140 thread_info = threading_helper.threading_setup()
141 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)